Add SetTimeRange to SelectStatement for use by continuous queries
parent
5f14d5faf5
commit
34e037a9d8
influxql
|
@ -716,6 +716,39 @@ func (s *SelectStatement) GroupByInterval() (time.Duration, error) {
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
// SetTimeRange sets the start and end time of the select statement to [start, end). i.e. start inclusive, end exclusive.
|
||||
// This is used commonly for continuous queries so the start and end are in buckets.
|
||||
func (s *SelectStatement) SetTimeRange(start, end time.Time) error {
|
||||
cond := fmt.Sprintf("time >= %ds AND time < %ds", start.Unix(), end.Unix())
|
||||
if s.Condition != nil {
|
||||
cond = fmt.Sprintf("%s AND %s", s.rewriteWithoutTimeDimensions(), cond)
|
||||
}
|
||||
|
||||
expr, err := NewParser(strings.NewReader(cond)).ParseExpr()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.Condition = expr
|
||||
return nil
|
||||
}
|
||||
|
||||
// rewriteWithoutTimeDimensions will remove any WHERE time... clauses from the select statement
|
||||
// This is necessary when setting an explicit time range to override any that previously existed.
|
||||
func (s *SelectStatement) rewriteWithoutTimeDimensions() string {
|
||||
n := RewriteFunc(s.Condition, func(n Node) Node {
|
||||
switch n := n.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.LHS.String() == "time" {
|
||||
return &BooleanLiteral{Val: true}
|
||||
}
|
||||
return n
|
||||
default:
|
||||
return n
|
||||
}
|
||||
})
|
||||
return n.String()
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
BinaryExpr
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package influxql_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -113,6 +114,107 @@ func TestSelectStatement_GroupByInterval(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the SELECT statment can have its start and end time set
|
||||
func TestSelectStatement_SetTimeRange(t *testing.T) {
|
||||
q := "SELECT sum(value) from foo GROUP BY time(10m)"
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", stmt, err)
|
||||
}
|
||||
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
min, max := influxql.TimeRange(s.Condition)
|
||||
start := time.Now().Add(-20 * time.Hour).Round(time.Second).UTC()
|
||||
end := time.Now().Add(10 * time.Hour).Round(time.Second).UTC()
|
||||
s.SetTimeRange(start, end)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
|
||||
if min != start {
|
||||
t.Fatalf("start time wasn't set properly.\n exp: %s\n got: %s", start, min)
|
||||
}
|
||||
// the end range is actually one microsecond before the given one since end is exclusive
|
||||
end = end.Add(-time.Microsecond)
|
||||
if max != end {
|
||||
t.Fatalf("end time wasn't set properly.\n exp: %s\n got: %s", end, max)
|
||||
}
|
||||
|
||||
// ensure we can set a time on a select that already has one set
|
||||
start = time.Now().Add(-20 * time.Hour).Round(time.Second).UTC()
|
||||
end = time.Now().Add(10 * time.Hour).Round(time.Second).UTC()
|
||||
q = fmt.Sprintf("SELECT sum(value) from foo WHERE time >= %ds and time <= %ds GROUP BY time(10m)", start.Unix(), end.Unix())
|
||||
stmt, err = influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", stmt, err)
|
||||
}
|
||||
|
||||
s = stmt.(*influxql.SelectStatement)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
if start != min || end != max {
|
||||
t.Fatalf("start and end times weren't equal:\n exp: %s\n got: %s\n exp: %s\n got:%s\n", start, min, end, max)
|
||||
}
|
||||
|
||||
// update and ensure it saves it
|
||||
start = time.Now().Add(-40 * time.Hour).Round(time.Second).UTC()
|
||||
end = time.Now().Add(20 * time.Hour).Round(time.Second).UTC()
|
||||
s.SetTimeRange(start, end)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
|
||||
// TODO: right now the SetTimeRange can't override the start time if it's more recent than what they're trying to set it to.
|
||||
// shouldn't matter for our purposes with continuous queries, but fix this later
|
||||
|
||||
if min != start {
|
||||
t.Fatalf("start time wasn't set properly.\n exp: %s\n got: %s", start, min)
|
||||
}
|
||||
// the end range is actually one microsecond before the given one since end is exclusive
|
||||
end = end.Add(-time.Microsecond)
|
||||
if max != end {
|
||||
t.Fatalf("end time wasn't set properly.\n exp: %s\n got: %s", end, max)
|
||||
}
|
||||
|
||||
// ensure that when we set a time range other where clause conditions are still there
|
||||
q = "SELECT sum(value) from foo WHERE foo = 'bar' GROUP BY time(10m)"
|
||||
stmt, err = influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", stmt, err)
|
||||
}
|
||||
|
||||
s = stmt.(*influxql.SelectStatement)
|
||||
|
||||
// update and ensure it saves it
|
||||
start = time.Now().Add(-40 * time.Hour).Round(time.Second).UTC()
|
||||
end = time.Now().Add(20 * time.Hour).Round(time.Second).UTC()
|
||||
s.SetTimeRange(start, end)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
|
||||
if min != start {
|
||||
t.Fatalf("start time wasn't set properly.\n exp: %s\n got: %s", start, min)
|
||||
}
|
||||
// the end range is actually one microsecond before the given one since end is exclusive
|
||||
end = end.Add(-time.Microsecond)
|
||||
if max != end {
|
||||
t.Fatalf("end time wasn't set properly.\n exp: %s\n got: %s", end, max)
|
||||
}
|
||||
|
||||
// ensure the where clause is there
|
||||
hasWhere := false
|
||||
influxql.WalkFunc(s.Condition, func(n influxql.Node) {
|
||||
if ex, ok := n.(*influxql.BinaryExpr); ok {
|
||||
if lhs, ok := ex.LHS.(*influxql.VarRef); ok {
|
||||
if lhs.Val == "foo" {
|
||||
if rhs, ok := ex.RHS.(*influxql.StringLiteral); ok {
|
||||
if rhs.Val == "bar" {
|
||||
hasWhere = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
if !hasWhere {
|
||||
t.Fatal("set time range cleared out the where clause")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the time range of an expression can be extracted.
|
||||
func TestTimeRange(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
|
|
Loading…
Reference in New Issue