Merge pull request #5990 from influxdata/js-5973-match-iterators-for-math-expressions
Various fixes for binary expression math between iteratorspull/6003/head
commit
16f4c4fd70
|
@ -1280,10 +1280,8 @@ func (s *SelectStatement) validateFields() error {
|
|||
for _, f := range s.Fields {
|
||||
switch expr := f.Expr.(type) {
|
||||
case *BinaryExpr:
|
||||
for _, call := range walkFunctionCalls(expr) {
|
||||
if call.Name == "top" || call.Name == "bottom" {
|
||||
return fmt.Errorf("cannot use %s() inside of a binary expression", call.Name)
|
||||
}
|
||||
if err := expr.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3060,6 +3058,52 @@ func (e *BinaryExpr) String() string {
|
|||
return fmt.Sprintf("%s %s %s", e.LHS.String(), e.Op.String(), e.RHS.String())
|
||||
}
|
||||
|
||||
func (e *BinaryExpr) validate() error {
|
||||
v := binaryExprValidator{}
|
||||
Walk(&v, e)
|
||||
if v.err != nil {
|
||||
return v.err
|
||||
} else if v.calls && v.refs {
|
||||
return errors.New("binary expressions cannot mix aggregates and raw fields")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type binaryExprValidator struct {
|
||||
calls bool
|
||||
refs bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (v *binaryExprValidator) Visit(n Node) Visitor {
|
||||
if v.err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch n := n.(type) {
|
||||
case *Call:
|
||||
v.calls = true
|
||||
|
||||
if n.Name == "top" || n.Name == "bottom" {
|
||||
v.err = fmt.Errorf("cannot use %s() inside of a binary expression", n.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, expr := range n.Args {
|
||||
switch e := expr.(type) {
|
||||
case *BinaryExpr:
|
||||
v.err = e.validate()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
case *VarRef:
|
||||
v.refs = true
|
||||
return nil
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func BinaryExprName(expr *BinaryExpr) string {
|
||||
v := binaryExprNameVisitor{}
|
||||
Walk(&v, expr)
|
||||
|
|
|
@ -1066,6 +1066,35 @@ func (itr *floatBoolTransformIterator) Next() *BooleanPoint {
|
|||
// new point if possible.
|
||||
type floatBoolTransformFunc func(p *FloatPoint) *BooleanPoint
|
||||
|
||||
// floatCombineTransformIterator executes a function to modify an existing point for every
|
||||
// output of the input iterator.
|
||||
type floatCombineTransformIterator struct {
|
||||
left *bufFloatIterator
|
||||
right *bufFloatIterator
|
||||
fn floatCombineTransformFunc
|
||||
}
|
||||
|
||||
func (itr *floatCombineTransformIterator) Close() error {
|
||||
itr.left.Close()
|
||||
itr.right.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *floatCombineTransformIterator) Next() *FloatPoint {
|
||||
a := itr.left.Next()
|
||||
b := itr.right.Next()
|
||||
if a == nil && b == nil {
|
||||
return nil
|
||||
}
|
||||
return itr.fn(a, b)
|
||||
}
|
||||
|
||||
// floatCombineTransformFunc creates or modifies a point by combining two
|
||||
// points. The point passed in may be modified and returned rather than
|
||||
// allocating a new point if possible.
|
||||
// One of the points may be nil, but at least one of the points will be non-nil.
|
||||
type floatCombineTransformFunc func(a *FloatPoint, b *FloatPoint) *FloatPoint
|
||||
|
||||
// floatDedupeIterator only outputs unique points.
|
||||
// This differs from the DistinctIterator in that it compares all aux fields too.
|
||||
// This iterator is relatively inefficient and should only be used on small
|
||||
|
@ -2204,6 +2233,35 @@ func (itr *integerBoolTransformIterator) Next() *BooleanPoint {
|
|||
// new point if possible.
|
||||
type integerBoolTransformFunc func(p *IntegerPoint) *BooleanPoint
|
||||
|
||||
// integerCombineTransformIterator executes a function to modify an existing point for every
|
||||
// output of the input iterator.
|
||||
type integerCombineTransformIterator struct {
|
||||
left *bufIntegerIterator
|
||||
right *bufIntegerIterator
|
||||
fn integerCombineTransformFunc
|
||||
}
|
||||
|
||||
func (itr *integerCombineTransformIterator) Close() error {
|
||||
itr.left.Close()
|
||||
itr.right.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *integerCombineTransformIterator) Next() *IntegerPoint {
|
||||
a := itr.left.Next()
|
||||
b := itr.right.Next()
|
||||
if a == nil && b == nil {
|
||||
return nil
|
||||
}
|
||||
return itr.fn(a, b)
|
||||
}
|
||||
|
||||
// integerCombineTransformFunc creates or modifies a point by combining two
|
||||
// points. The point passed in may be modified and returned rather than
|
||||
// allocating a new point if possible.
|
||||
// One of the points may be nil, but at least one of the points will be non-nil.
|
||||
type integerCombineTransformFunc func(a *IntegerPoint, b *IntegerPoint) *IntegerPoint
|
||||
|
||||
// integerDedupeIterator only outputs unique points.
|
||||
// This differs from the DistinctIterator in that it compares all aux fields too.
|
||||
// This iterator is relatively inefficient and should only be used on small
|
||||
|
@ -3342,6 +3400,35 @@ func (itr *stringBoolTransformIterator) Next() *BooleanPoint {
|
|||
// new point if possible.
|
||||
type stringBoolTransformFunc func(p *StringPoint) *BooleanPoint
|
||||
|
||||
// stringCombineTransformIterator executes a function to modify an existing point for every
|
||||
// output of the input iterator.
|
||||
type stringCombineTransformIterator struct {
|
||||
left *bufStringIterator
|
||||
right *bufStringIterator
|
||||
fn stringCombineTransformFunc
|
||||
}
|
||||
|
||||
func (itr *stringCombineTransformIterator) Close() error {
|
||||
itr.left.Close()
|
||||
itr.right.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *stringCombineTransformIterator) Next() *StringPoint {
|
||||
a := itr.left.Next()
|
||||
b := itr.right.Next()
|
||||
if a == nil && b == nil {
|
||||
return nil
|
||||
}
|
||||
return itr.fn(a, b)
|
||||
}
|
||||
|
||||
// stringCombineTransformFunc creates or modifies a point by combining two
|
||||
// points. The point passed in may be modified and returned rather than
|
||||
// allocating a new point if possible.
|
||||
// One of the points may be nil, but at least one of the points will be non-nil.
|
||||
type stringCombineTransformFunc func(a *StringPoint, b *StringPoint) *StringPoint
|
||||
|
||||
// stringDedupeIterator only outputs unique points.
|
||||
// This differs from the DistinctIterator in that it compares all aux fields too.
|
||||
// This iterator is relatively inefficient and should only be used on small
|
||||
|
@ -4480,6 +4567,35 @@ func (itr *booleanBoolTransformIterator) Next() *BooleanPoint {
|
|||
// new point if possible.
|
||||
type booleanBoolTransformFunc func(p *BooleanPoint) *BooleanPoint
|
||||
|
||||
// booleanCombineTransformIterator executes a function to modify an existing point for every
|
||||
// output of the input iterator.
|
||||
type booleanCombineTransformIterator struct {
|
||||
left *bufBooleanIterator
|
||||
right *bufBooleanIterator
|
||||
fn booleanCombineTransformFunc
|
||||
}
|
||||
|
||||
func (itr *booleanCombineTransformIterator) Close() error {
|
||||
itr.left.Close()
|
||||
itr.right.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *booleanCombineTransformIterator) Next() *BooleanPoint {
|
||||
a := itr.left.Next()
|
||||
b := itr.right.Next()
|
||||
if a == nil && b == nil {
|
||||
return nil
|
||||
}
|
||||
return itr.fn(a, b)
|
||||
}
|
||||
|
||||
// booleanCombineTransformFunc creates or modifies a point by combining two
|
||||
// points. The point passed in may be modified and returned rather than
|
||||
// allocating a new point if possible.
|
||||
// One of the points may be nil, but at least one of the points will be non-nil.
|
||||
type booleanCombineTransformFunc func(a *BooleanPoint, b *BooleanPoint) *BooleanPoint
|
||||
|
||||
// booleanDedupeIterator only outputs unique points.
|
||||
// This differs from the DistinctIterator in that it compares all aux fields too.
|
||||
// This iterator is relatively inefficient and should only be used on small
|
||||
|
|
|
@ -786,6 +786,35 @@ func (itr *{{$k.name}}BoolTransformIterator) Next() *BooleanPoint {
|
|||
// new point if possible.
|
||||
type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint
|
||||
|
||||
// {{$k.name}}CombineTransformIterator executes a function to modify an existing point for every
|
||||
// output of the input iterator.
|
||||
type {{$k.name}}CombineTransformIterator struct {
|
||||
left *buf{{$k.Name}}Iterator
|
||||
right *buf{{$k.Name}}Iterator
|
||||
fn {{$k.name}}CombineTransformFunc
|
||||
}
|
||||
|
||||
func (itr *{{$k.name}}CombineTransformIterator) Close() error {
|
||||
itr.left.Close()
|
||||
itr.right.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *{{$k.name}}CombineTransformIterator) Next() *{{$k.Name}}Point {
|
||||
a := itr.left.Next()
|
||||
b := itr.right.Next()
|
||||
if a == nil && b == nil {
|
||||
return nil
|
||||
}
|
||||
return itr.fn(a, b)
|
||||
}
|
||||
|
||||
// {{$k.name}}CombineTransformFunc creates or modifies a point by combining two
|
||||
// points. The point passed in may be modified and returned rather than
|
||||
// allocating a new point if possible.
|
||||
// One of the points may be nil, but at least one of the points will be non-nil.
|
||||
type {{$k.name}}CombineTransformFunc func(a *{{$k.Name}}Point, b *{{$k.Name}}Point) *{{$k.Name}}Point
|
||||
|
||||
// {{$k.name}}DedupeIterator only outputs unique points.
|
||||
// This differs from the DistinctIterator in that it compares all aux fields too.
|
||||
// This iterator is relatively inefficient and should only be used on small
|
||||
|
|
|
@ -984,21 +984,33 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se
|
|||
case influxql.FloatIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.IntegerIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.StringIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.BooleanIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1713,9 +1713,14 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
{s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
|
||||
{s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
|
||||
{s: `SELECT s =~ /foo/ FROM cpu`, err: `invalid operator =~ in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
|
||||
{s: `SELECT mean(value) + value FROM cpu WHERE time < now() and time > now() - 1h GROUP BY time(10m)`, err: `binary expressions cannot mix aggregates and raw fields`},
|
||||
// TODO: Remove this restriction in the future: https://github.com/influxdata/influxdb/issues/5968
|
||||
{s: `SELECT mean(cpu_total - cpu_idle) FROM cpu`, err: `expected field argument in mean()`},
|
||||
{s: `SELECT derivative(mean(cpu_total - cpu_idle), 1s) FROM cpu WHERE time < now() AND time > now() - 1d GROUP BY time(1h)`, err: `expected field argument in mean()`},
|
||||
// TODO: The error message will change when math is allowed inside an aggregate: https://github.com/influxdata/influxdb/pull/5990#issuecomment-195565870
|
||||
{s: `SELECT count(foo + sum(bar)) FROM cpu`, err: `expected field argument in count()`},
|
||||
{s: `SELECT (count(foo + sum(bar))) FROM cpu`, err: `expected field argument in count()`},
|
||||
{s: `SELECT sum(value) + count(foo + sum(bar)) FROM cpu`, err: `binary expressions cannot mix aggregates and raw fields`},
|
||||
// See issues https://github.com/influxdata/influxdb/issues/1647
|
||||
// and https://github.com/influxdata/influxdb/issues/4404
|
||||
//{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
|
||||
|
|
|
@ -399,6 +399,8 @@ func buildRHSTransformIterator(lhs Iterator, rhs Literal, op Token, ic IteratorC
|
|||
fn: func(p *FloatPoint) *FloatPoint {
|
||||
if p == nil {
|
||||
return nil
|
||||
} else if p.Nil {
|
||||
return p
|
||||
}
|
||||
p.Value = fn(p.Value, lit.Val)
|
||||
return p
|
||||
|
@ -425,13 +427,19 @@ func buildRHSTransformIterator(lhs Iterator, rhs Literal, op Token, ic IteratorC
|
|||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
return &BooleanPoint{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Time: p.Time,
|
||||
Value: fn(p.Value, lit.Val),
|
||||
Aux: p.Aux,
|
||||
|
||||
bp := &BooleanPoint{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Time: p.Time,
|
||||
Aux: p.Aux,
|
||||
}
|
||||
if p.Nil {
|
||||
bp.Nil = true
|
||||
} else {
|
||||
bp.Value = fn(p.Value, lit.Val)
|
||||
}
|
||||
return bp
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
@ -461,6 +469,8 @@ func buildLHSTransformIterator(lhs Literal, rhs Iterator, op Token, ic IteratorC
|
|||
fn: func(p *FloatPoint) *FloatPoint {
|
||||
if p == nil {
|
||||
return nil
|
||||
} else if p.Nil {
|
||||
return p
|
||||
}
|
||||
p.Value = fn(lit.Val, p.Value)
|
||||
return p
|
||||
|
@ -487,13 +497,19 @@ func buildLHSTransformIterator(lhs Literal, rhs Iterator, op Token, ic IteratorC
|
|||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
return &BooleanPoint{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Time: p.Time,
|
||||
Value: fn(lit.Val, p.Value),
|
||||
Aux: p.Aux,
|
||||
|
||||
bp := &BooleanPoint{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Time: p.Time,
|
||||
Aux: p.Aux,
|
||||
}
|
||||
if p.Nil {
|
||||
bp.Nil = true
|
||||
} else {
|
||||
bp.Value = fn(lit.Val, p.Value)
|
||||
}
|
||||
return bp
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
@ -523,18 +539,27 @@ func buildTransformIterator(lhs Iterator, rhs Iterator, op Token, ic IteratorCre
|
|||
default:
|
||||
return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs)
|
||||
}
|
||||
return &floatTransformIterator{
|
||||
input: left,
|
||||
fn: func(p *FloatPoint) *FloatPoint {
|
||||
if p == nil {
|
||||
return nil
|
||||
return &floatCombineTransformIterator{
|
||||
left: newBufFloatIterator(left),
|
||||
right: newBufFloatIterator(right),
|
||||
fn: func(a *FloatPoint, b *FloatPoint) *FloatPoint {
|
||||
if a != nil && b != nil {
|
||||
if !a.Nil || !b.Nil {
|
||||
a.Value = fn(a.Value, b.Value)
|
||||
a.Nil = false
|
||||
}
|
||||
return a
|
||||
} else if a != nil {
|
||||
if !a.Nil {
|
||||
a.Value = fn(a.Value, 0)
|
||||
}
|
||||
return a
|
||||
} else {
|
||||
if !b.Nil {
|
||||
b.Value = fn(0, b.Value)
|
||||
}
|
||||
return b
|
||||
}
|
||||
p2 := right.Next()
|
||||
if p2 == nil {
|
||||
return nil
|
||||
}
|
||||
p.Value = fn(p.Value, p2.Value)
|
||||
return p
|
||||
},
|
||||
}, nil
|
||||
case func(int64, int64) float64:
|
||||
|
|
|
@ -1570,6 +1570,70 @@ func TestSelect_BinaryExpr_Mixed(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure a SELECT binary expr with nil values can be executed.
|
||||
// Nil values may be present when a field is missing from one iterator,
|
||||
// but not the other.
|
||||
func TestSelect_BinaryExpr_NilValues(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return &FloatIterator{Points: []influxql.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 20, Aux: []interface{}{float64(20), nil}},
|
||||
{Name: "cpu", Time: 5 * Second, Value: 10, Aux: []interface{}{float64(10), float64(15)}},
|
||||
{Name: "cpu", Time: 9 * Second, Value: 19, Aux: []interface{}{nil, int64(5)}},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
for _, test := range []struct {
|
||||
Name string
|
||||
Statement string
|
||||
Points [][]influxql.Point
|
||||
}{
|
||||
{
|
||||
Name: "nil binary add",
|
||||
Statement: `SELECT total + value FROM cpu`,
|
||||
Points: [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 20}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: 25}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: 5}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "nil binary subtract",
|
||||
Statement: `SELECT total - value FROM cpu`,
|
||||
Points: [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 20}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: -5}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: -5}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "nil binary multiply",
|
||||
Statement: `SELECT total * value FROM cpu`,
|
||||
Points: [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 0}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: 150}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: 0}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "nil binary division",
|
||||
Statement: `SELECT total / value FROM cpu`,
|
||||
Points: [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 0}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 5 * Second, Value: float64(10) / float64(15)}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Time: 9 * Second, Value: 0}},
|
||||
},
|
||||
},
|
||||
} {
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(test.Statement), &ic, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: parse error: %s", test.Name, err)
|
||||
} else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, test.Points) {
|
||||
t.Errorf("%s: unexpected points: %s", test.Name, spew.Sdump(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a SELECT (...) query can be executed.
|
||||
func TestSelect_ParenExpr(t *testing.T) {
|
||||
var ic IteratorCreator
|
||||
|
|
Loading…
Reference in New Issue