diff --git a/influxql/ast.go b/influxql/ast.go index 02b15f7e7f..2f6bb5c8e9 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1280,10 +1280,8 @@ func (s *SelectStatement) validateFields() error { for _, f := range s.Fields { switch expr := f.Expr.(type) { case *BinaryExpr: - for _, call := range walkFunctionCalls(expr) { - if call.Name == "top" || call.Name == "bottom" { - return fmt.Errorf("cannot use %s() inside of a binary expression", call.Name) - } + if err := expr.validate(); err != nil { + return err } } } @@ -3060,6 +3058,52 @@ func (e *BinaryExpr) String() string { return fmt.Sprintf("%s %s %s", e.LHS.String(), e.Op.String(), e.RHS.String()) } +func (e *BinaryExpr) validate() error { + v := binaryExprValidator{} + Walk(&v, e) + if v.err != nil { + return v.err + } else if v.calls && v.refs { + return errors.New("binary expressions cannot mix aggregates and raw fields") + } + return nil +} + +type binaryExprValidator struct { + calls bool + refs bool + err error +} + +func (v *binaryExprValidator) Visit(n Node) Visitor { + if v.err != nil { + return nil + } + + switch n := n.(type) { + case *Call: + v.calls = true + + if n.Name == "top" || n.Name == "bottom" { + v.err = fmt.Errorf("cannot use %s() inside of a binary expression", n.Name) + return nil + } + + for _, expr := range n.Args { + switch e := expr.(type) { + case *BinaryExpr: + v.err = e.validate() + return nil + } + } + return nil + case *VarRef: + v.refs = true + return nil + } + return v +} + func BinaryExprName(expr *BinaryExpr) string { v := binaryExprNameVisitor{} Walk(&v, expr) diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index a19ba34581..09095fcd5e 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -1066,6 +1066,35 @@ func (itr *floatBoolTransformIterator) Next() *BooleanPoint { // new point if possible. type floatBoolTransformFunc func(p *FloatPoint) *BooleanPoint +// floatCombineTransformIterator executes a function to modify an existing point for every +// output of the input iterator. +type floatCombineTransformIterator struct { + left *bufFloatIterator + right *bufFloatIterator + fn floatCombineTransformFunc +} + +func (itr *floatCombineTransformIterator) Close() error { + itr.left.Close() + itr.right.Close() + return nil +} + +func (itr *floatCombineTransformIterator) Next() *FloatPoint { + a := itr.left.Next() + b := itr.right.Next() + if a == nil && b == nil { + return nil + } + return itr.fn(a, b) +} + +// floatCombineTransformFunc creates or modifies a point by combining two +// points. The point passed in may be modified and returned rather than +// allocating a new point if possible. +// One of the points may be nil, but at least one of the points will be non-nil. +type floatCombineTransformFunc func(a *FloatPoint, b *FloatPoint) *FloatPoint + // floatDedupeIterator only outputs unique points. // This differs from the DistinctIterator in that it compares all aux fields too. // This iterator is relatively inefficient and should only be used on small @@ -2204,6 +2233,35 @@ func (itr *integerBoolTransformIterator) Next() *BooleanPoint { // new point if possible. type integerBoolTransformFunc func(p *IntegerPoint) *BooleanPoint +// integerCombineTransformIterator executes a function to modify an existing point for every +// output of the input iterator. +type integerCombineTransformIterator struct { + left *bufIntegerIterator + right *bufIntegerIterator + fn integerCombineTransformFunc +} + +func (itr *integerCombineTransformIterator) Close() error { + itr.left.Close() + itr.right.Close() + return nil +} + +func (itr *integerCombineTransformIterator) Next() *IntegerPoint { + a := itr.left.Next() + b := itr.right.Next() + if a == nil && b == nil { + return nil + } + return itr.fn(a, b) +} + +// integerCombineTransformFunc creates or modifies a point by combining two +// points. The point passed in may be modified and returned rather than +// allocating a new point if possible. +// One of the points may be nil, but at least one of the points will be non-nil. +type integerCombineTransformFunc func(a *IntegerPoint, b *IntegerPoint) *IntegerPoint + // integerDedupeIterator only outputs unique points. // This differs from the DistinctIterator in that it compares all aux fields too. // This iterator is relatively inefficient and should only be used on small @@ -3342,6 +3400,35 @@ func (itr *stringBoolTransformIterator) Next() *BooleanPoint { // new point if possible. type stringBoolTransformFunc func(p *StringPoint) *BooleanPoint +// stringCombineTransformIterator executes a function to modify an existing point for every +// output of the input iterator. +type stringCombineTransformIterator struct { + left *bufStringIterator + right *bufStringIterator + fn stringCombineTransformFunc +} + +func (itr *stringCombineTransformIterator) Close() error { + itr.left.Close() + itr.right.Close() + return nil +} + +func (itr *stringCombineTransformIterator) Next() *StringPoint { + a := itr.left.Next() + b := itr.right.Next() + if a == nil && b == nil { + return nil + } + return itr.fn(a, b) +} + +// stringCombineTransformFunc creates or modifies a point by combining two +// points. The point passed in may be modified and returned rather than +// allocating a new point if possible. +// One of the points may be nil, but at least one of the points will be non-nil. +type stringCombineTransformFunc func(a *StringPoint, b *StringPoint) *StringPoint + // stringDedupeIterator only outputs unique points. // This differs from the DistinctIterator in that it compares all aux fields too. // This iterator is relatively inefficient and should only be used on small @@ -4480,6 +4567,35 @@ func (itr *booleanBoolTransformIterator) Next() *BooleanPoint { // new point if possible. type booleanBoolTransformFunc func(p *BooleanPoint) *BooleanPoint +// booleanCombineTransformIterator executes a function to modify an existing point for every +// output of the input iterator. +type booleanCombineTransformIterator struct { + left *bufBooleanIterator + right *bufBooleanIterator + fn booleanCombineTransformFunc +} + +func (itr *booleanCombineTransformIterator) Close() error { + itr.left.Close() + itr.right.Close() + return nil +} + +func (itr *booleanCombineTransformIterator) Next() *BooleanPoint { + a := itr.left.Next() + b := itr.right.Next() + if a == nil && b == nil { + return nil + } + return itr.fn(a, b) +} + +// booleanCombineTransformFunc creates or modifies a point by combining two +// points. The point passed in may be modified and returned rather than +// allocating a new point if possible. +// One of the points may be nil, but at least one of the points will be non-nil. +type booleanCombineTransformFunc func(a *BooleanPoint, b *BooleanPoint) *BooleanPoint + // booleanDedupeIterator only outputs unique points. // This differs from the DistinctIterator in that it compares all aux fields too. // This iterator is relatively inefficient and should only be used on small diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 661e489300..69f255b579 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -786,6 +786,35 @@ func (itr *{{$k.name}}BoolTransformIterator) Next() *BooleanPoint { // new point if possible. type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint +// {{$k.name}}CombineTransformIterator executes a function to modify an existing point for every +// output of the input iterator. +type {{$k.name}}CombineTransformIterator struct { + left *buf{{$k.Name}}Iterator + right *buf{{$k.Name}}Iterator + fn {{$k.name}}CombineTransformFunc +} + +func (itr *{{$k.name}}CombineTransformIterator) Close() error { + itr.left.Close() + itr.right.Close() + return nil +} + +func (itr *{{$k.name}}CombineTransformIterator) Next() *{{$k.Name}}Point { + a := itr.left.Next() + b := itr.right.Next() + if a == nil && b == nil { + return nil + } + return itr.fn(a, b) +} + +// {{$k.name}}CombineTransformFunc creates or modifies a point by combining two +// points. The point passed in may be modified and returned rather than +// allocating a new point if possible. +// One of the points may be nil, but at least one of the points will be non-nil. +type {{$k.name}}CombineTransformFunc func(a *{{$k.Name}}Point, b *{{$k.Name}}Point) *{{$k.Name}}Point + // {{$k.name}}DedupeIterator only outputs unique points. // This differs from the DistinctIterator in that it compares all aux fields too. // This iterator is relatively inefficient and should only be used on small diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index 668d7e47df..ddae07ad40 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -984,21 +984,33 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se case influxql.FloatIterator: for p := itr.Next(); p != nil; p = itr.Next() { s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} + if series, ok := seriesMap[s.ID()]; ok { + s.Combine(&series) + } seriesMap[s.ID()] = s } case influxql.IntegerIterator: for p := itr.Next(); p != nil; p = itr.Next() { s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} + if series, ok := seriesMap[s.ID()]; ok { + s.Combine(&series) + } seriesMap[s.ID()] = s } case influxql.StringIterator: for p := itr.Next(); p != nil; p = itr.Next() { s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} + if series, ok := seriesMap[s.ID()]; ok { + s.Combine(&series) + } seriesMap[s.ID()] = s } case influxql.BooleanIterator: for p := itr.Next(); p != nil; p = itr.Next() { s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} + if series, ok := seriesMap[s.ID()]; ok { + s.Combine(&series) + } seriesMap[s.ID()] = s } } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 974fb7bb56..23458581c2 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1713,9 +1713,14 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`}, {s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`}, {s: `SELECT s =~ /foo/ FROM cpu`, err: `invalid operator =~ in SELECT clause at line 1, char 8; operator is intended for WHERE clause`}, + {s: `SELECT mean(value) + value FROM cpu WHERE time < now() and time > now() - 1h GROUP BY time(10m)`, err: `binary expressions cannot mix aggregates and raw fields`}, // TODO: Remove this restriction in the future: https://github.com/influxdata/influxdb/issues/5968 {s: `SELECT mean(cpu_total - cpu_idle) FROM cpu`, err: `expected field argument in mean()`}, {s: `SELECT derivative(mean(cpu_total - cpu_idle), 1s) FROM cpu WHERE time < now() AND time > now() - 1d GROUP BY time(1h)`, err: `expected field argument in mean()`}, + // TODO: The error message will change when math is allowed inside an aggregate: https://github.com/influxdata/influxdb/pull/5990#issuecomment-195565870 + {s: `SELECT count(foo + sum(bar)) FROM cpu`, err: `expected field argument in count()`}, + {s: `SELECT (count(foo + sum(bar))) FROM cpu`, err: `expected field argument in count()`}, + {s: `SELECT sum(value) + count(foo + sum(bar)) FROM cpu`, err: `binary expressions cannot mix aggregates and raw fields`}, // See issues https://github.com/influxdata/influxdb/issues/1647 // and https://github.com/influxdata/influxdb/issues/4404 //{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`}, diff --git a/influxql/select.go b/influxql/select.go index ef26ed98c0..226bdd3c8f 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -399,6 +399,8 @@ func buildRHSTransformIterator(lhs Iterator, rhs Literal, op Token, ic IteratorC fn: func(p *FloatPoint) *FloatPoint { if p == nil { return nil + } else if p.Nil { + return p } p.Value = fn(p.Value, lit.Val) return p @@ -425,13 +427,19 @@ func buildRHSTransformIterator(lhs Iterator, rhs Literal, op Token, ic IteratorC if p == nil { return nil } - return &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Value: fn(p.Value, lit.Val), - Aux: p.Aux, + + bp := &BooleanPoint{ + Name: p.Name, + Tags: p.Tags, + Time: p.Time, + Aux: p.Aux, } + if p.Nil { + bp.Nil = true + } else { + bp.Value = fn(p.Value, lit.Val) + } + return bp }, }, nil } @@ -461,6 +469,8 @@ func buildLHSTransformIterator(lhs Literal, rhs Iterator, op Token, ic IteratorC fn: func(p *FloatPoint) *FloatPoint { if p == nil { return nil + } else if p.Nil { + return p } p.Value = fn(lit.Val, p.Value) return p @@ -487,13 +497,19 @@ func buildLHSTransformIterator(lhs Literal, rhs Iterator, op Token, ic IteratorC if p == nil { return nil } - return &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Value: fn(lit.Val, p.Value), - Aux: p.Aux, + + bp := &BooleanPoint{ + Name: p.Name, + Tags: p.Tags, + Time: p.Time, + Aux: p.Aux, } + if p.Nil { + bp.Nil = true + } else { + bp.Value = fn(lit.Val, p.Value) + } + return bp }, }, nil } @@ -523,18 +539,27 @@ func buildTransformIterator(lhs Iterator, rhs Iterator, op Token, ic IteratorCre default: return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs) } - return &floatTransformIterator{ - input: left, - fn: func(p *FloatPoint) *FloatPoint { - if p == nil { - return nil + return &floatCombineTransformIterator{ + left: newBufFloatIterator(left), + right: newBufFloatIterator(right), + fn: func(a *FloatPoint, b *FloatPoint) *FloatPoint { + if a != nil && b != nil { + if !a.Nil || !b.Nil { + a.Value = fn(a.Value, b.Value) + a.Nil = false + } + return a + } else if a != nil { + if !a.Nil { + a.Value = fn(a.Value, 0) + } + return a + } else { + if !b.Nil { + b.Value = fn(0, b.Value) + } + return b } - p2 := right.Next() - if p2 == nil { - return nil - } - p.Value = fn(p.Value, p2.Value) - return p }, }, nil case func(int64, int64) float64: diff --git a/influxql/select_test.go b/influxql/select_test.go index 91fe6d3bd0..091a98b090 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -1570,6 +1570,70 @@ func TestSelect_BinaryExpr_Mixed(t *testing.T) { } } +// Ensure a SELECT binary expr with nil values can be executed. +// Nil values may be present when a field is missing from one iterator, +// but not the other. +func TestSelect_BinaryExpr_NilValues(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20, Aux: []interface{}{float64(20), nil}}, + {Name: "cpu", Time: 5 * Second, Value: 10, Aux: []interface{}{float64(10), float64(15)}}, + {Name: "cpu", Time: 9 * Second, Value: 19, Aux: []interface{}{nil, int64(5)}}, + }}, nil + } + + for _, test := range []struct { + Name string + Statement string + Points [][]influxql.Point + }{ + { + Name: "nil binary add", + Statement: `SELECT total + value FROM cpu`, + Points: [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: 25}}, + {&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: 5}}, + }, + }, + { + Name: "nil binary subtract", + Statement: `SELECT total - value FROM cpu`, + Points: [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: -5}}, + {&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: -5}}, + }, + }, + { + Name: "nil binary multiply", + Statement: `SELECT total * value FROM cpu`, + Points: [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 0}}, + {&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: 150}}, + {&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: 0}}, + }, + }, + { + Name: "nil binary division", + Statement: `SELECT total / value FROM cpu`, + Points: [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 0}}, + {&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: float64(10) / float64(15)}}, + {&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: 0}}, + }, + }, + } { + itrs, err := influxql.Select(MustParseSelectStatement(test.Statement), &ic, nil) + if err != nil { + t.Errorf("%s: parse error: %s", test.Name, err) + } else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, test.Points) { + t.Errorf("%s: unexpected points: %s", test.Name, spew.Sdump(a)) + } + } +} + // Ensure a SELECT (...) query can be executed. func TestSelect_ParenExpr(t *testing.T) { var ic IteratorCreator