diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 1aab053138..000e7cd29e 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -4710,6 +4710,16 @@ func TestServer_Query_Subqueries(t *testing.T) { command: `SELECT min(value) FROM (SELECT top(usage_user, 2), usage_user - usage_system AS value FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' GROUP BY host`, exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-10]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`, }, + &Query{ + params: url.Values{"db": []string{"db0"}}, + command: `SELECT min(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND host = 'server01'`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:00Z",40]]}]}]}`, + }, + &Query{ + params: url.Values{"db": []string{"db0"}}, + command: `SELECT value FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND value > 0`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",40]]}]}]}`, + }, }...) for i, query := range test.queries { diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 00be1c116a..62cbe905ca 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -2403,6 +2403,65 @@ type floatDedupeIterator struct { m map[string]struct{} // lookup of points already sent } +type floatFilterIterator struct { + input FloatIterator + cond Expr + opt IteratorOptions + m map[string]interface{} +} + +func newFloatFilterIterator(input FloatIterator, cond Expr, opt IteratorOptions) FloatIterator { + // Strip out time conditions from the WHERE clause. + // TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct. + n := RewriteFunc(CloneExpr(cond), func(n Node) Node { + switch n := n.(type) { + case *BinaryExpr: + if n.LHS.String() == "time" { + return &BooleanLiteral{Val: true} + } + } + return n + }) + + cond, _ = n.(Expr) + if cond == nil { + return input + } else if n, ok := cond.(*BooleanLiteral); ok && n.Val { + return input + } + + return &floatFilterIterator{ + input: input, + cond: cond, + opt: opt, + m: make(map[string]interface{}), + } +} + +func (itr *floatFilterIterator) Stats() IteratorStats { return itr.input.Stats() } +func (itr *floatFilterIterator) Close() error { return itr.input.Close() } + +func (itr *floatFilterIterator) Next() (*FloatPoint, error) { + for { + p, err := itr.input.Next() + if err != nil || p == nil { + return nil, err + } + + for i, ref := range itr.opt.Aux { + itr.m[ref.Val] = p.Aux[i] + } + for k, v := range p.Tags.KeyValues() { + itr.m[k] = v + } + + if !EvalBool(itr.cond, itr.m) { + continue + } + return p, nil + } +} + // newFloatDedupeIterator returns a new instance of floatDedupeIterator. func newFloatDedupeIterator(input FloatIterator) *floatDedupeIterator { return &floatDedupeIterator{ @@ -4860,6 +4919,65 @@ type integerDedupeIterator struct { m map[string]struct{} // lookup of points already sent } +type integerFilterIterator struct { + input IntegerIterator + cond Expr + opt IteratorOptions + m map[string]interface{} +} + +func newIntegerFilterIterator(input IntegerIterator, cond Expr, opt IteratorOptions) IntegerIterator { + // Strip out time conditions from the WHERE clause. + // TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct. + n := RewriteFunc(CloneExpr(cond), func(n Node) Node { + switch n := n.(type) { + case *BinaryExpr: + if n.LHS.String() == "time" { + return &BooleanLiteral{Val: true} + } + } + return n + }) + + cond, _ = n.(Expr) + if cond == nil { + return input + } else if n, ok := cond.(*BooleanLiteral); ok && n.Val { + return input + } + + return &integerFilterIterator{ + input: input, + cond: cond, + opt: opt, + m: make(map[string]interface{}), + } +} + +func (itr *integerFilterIterator) Stats() IteratorStats { return itr.input.Stats() } +func (itr *integerFilterIterator) Close() error { return itr.input.Close() } + +func (itr *integerFilterIterator) Next() (*IntegerPoint, error) { + for { + p, err := itr.input.Next() + if err != nil || p == nil { + return nil, err + } + + for i, ref := range itr.opt.Aux { + itr.m[ref.Val] = p.Aux[i] + } + for k, v := range p.Tags.KeyValues() { + itr.m[k] = v + } + + if !EvalBool(itr.cond, itr.m) { + continue + } + return p, nil + } +} + // newIntegerDedupeIterator returns a new instance of integerDedupeIterator. func newIntegerDedupeIterator(input IntegerIterator) *integerDedupeIterator { return &integerDedupeIterator{ @@ -7302,6 +7420,65 @@ type stringDedupeIterator struct { m map[string]struct{} // lookup of points already sent } +type stringFilterIterator struct { + input StringIterator + cond Expr + opt IteratorOptions + m map[string]interface{} +} + +func newStringFilterIterator(input StringIterator, cond Expr, opt IteratorOptions) StringIterator { + // Strip out time conditions from the WHERE clause. + // TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct. + n := RewriteFunc(CloneExpr(cond), func(n Node) Node { + switch n := n.(type) { + case *BinaryExpr: + if n.LHS.String() == "time" { + return &BooleanLiteral{Val: true} + } + } + return n + }) + + cond, _ = n.(Expr) + if cond == nil { + return input + } else if n, ok := cond.(*BooleanLiteral); ok && n.Val { + return input + } + + return &stringFilterIterator{ + input: input, + cond: cond, + opt: opt, + m: make(map[string]interface{}), + } +} + +func (itr *stringFilterIterator) Stats() IteratorStats { return itr.input.Stats() } +func (itr *stringFilterIterator) Close() error { return itr.input.Close() } + +func (itr *stringFilterIterator) Next() (*StringPoint, error) { + for { + p, err := itr.input.Next() + if err != nil || p == nil { + return nil, err + } + + for i, ref := range itr.opt.Aux { + itr.m[ref.Val] = p.Aux[i] + } + for k, v := range p.Tags.KeyValues() { + itr.m[k] = v + } + + if !EvalBool(itr.cond, itr.m) { + continue + } + return p, nil + } +} + // newStringDedupeIterator returns a new instance of stringDedupeIterator. func newStringDedupeIterator(input StringIterator) *stringDedupeIterator { return &stringDedupeIterator{ @@ -9744,6 +9921,65 @@ type booleanDedupeIterator struct { m map[string]struct{} // lookup of points already sent } +type booleanFilterIterator struct { + input BooleanIterator + cond Expr + opt IteratorOptions + m map[string]interface{} +} + +func newBooleanFilterIterator(input BooleanIterator, cond Expr, opt IteratorOptions) BooleanIterator { + // Strip out time conditions from the WHERE clause. + // TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct. + n := RewriteFunc(CloneExpr(cond), func(n Node) Node { + switch n := n.(type) { + case *BinaryExpr: + if n.LHS.String() == "time" { + return &BooleanLiteral{Val: true} + } + } + return n + }) + + cond, _ = n.(Expr) + if cond == nil { + return input + } else if n, ok := cond.(*BooleanLiteral); ok && n.Val { + return input + } + + return &booleanFilterIterator{ + input: input, + cond: cond, + opt: opt, + m: make(map[string]interface{}), + } +} + +func (itr *booleanFilterIterator) Stats() IteratorStats { return itr.input.Stats() } +func (itr *booleanFilterIterator) Close() error { return itr.input.Close() } + +func (itr *booleanFilterIterator) Next() (*BooleanPoint, error) { + for { + p, err := itr.input.Next() + if err != nil || p == nil { + return nil, err + } + + for i, ref := range itr.opt.Aux { + itr.m[ref.Val] = p.Aux[i] + } + for k, v := range p.Tags.KeyValues() { + itr.m[k] = v + } + + if !EvalBool(itr.cond, itr.m) { + continue + } + return p, nil + } +} + // newBooleanDedupeIterator returns a new instance of booleanDedupeIterator. func newBooleanDedupeIterator(input BooleanIterator) *booleanDedupeIterator { return &booleanDedupeIterator{ diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index ec25c688a8..764e8546e5 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -1404,6 +1404,65 @@ type {{$k.name}}DedupeIterator struct { m map[string]struct{} // lookup of points already sent } +type {{$k.name}}FilterIterator struct { + input {{$k.Name}}Iterator + cond Expr + opt IteratorOptions + m map[string]interface{} +} + +func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond Expr, opt IteratorOptions) {{$k.Name}}Iterator { + // Strip out time conditions from the WHERE clause. + // TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct. + n := RewriteFunc(CloneExpr(cond), func(n Node) Node { + switch n := n.(type) { + case *BinaryExpr: + if n.LHS.String() == "time" { + return &BooleanLiteral{Val: true} + } + } + return n + }) + + cond, _ = n.(Expr) + if cond == nil { + return input + } else if n, ok := cond.(*BooleanLiteral); ok && n.Val { + return input + } + + return &{{$k.name}}FilterIterator{ + input: input, + cond: cond, + opt: opt, + m: make(map[string]interface{}), + } +} + +func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() } +func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() } + +func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) { + for { + p, err := itr.input.Next() + if err != nil || p == nil { + return nil, err + } + + for i, ref := range itr.opt.Aux { + itr.m[ref.Val] = p.Aux[i] + } + for k, v := range p.Tags.KeyValues() { + itr.m[k] = v + } + + if !EvalBool(itr.cond, itr.m) { + continue + } + return p, nil + } +} + // new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator. func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator { return &{{$k.name}}DedupeIterator{ diff --git a/influxql/iterator.go b/influxql/iterator.go index acb45fac36..ee530c669a 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -281,6 +281,28 @@ func NewLimitIterator(input Iterator, opt IteratorOptions) Iterator { } } +// NewFilterIterator returns an iterator that filters the points based on the +// condition. This iterator is not nearly as efficient as filtering points +// within the query engine and is only used when filtering subqueries. +func NewFilterIterator(input Iterator, cond Expr, opt IteratorOptions) Iterator { + if input == nil { + return nil + } + + switch input := input.(type) { + case FloatIterator: + return newFloatFilterIterator(input, cond, opt) + case IntegerIterator: + return newIntegerFilterIterator(input, cond, opt) + case StringIterator: + return newStringFilterIterator(input, cond, opt) + case BooleanIterator: + return newBooleanFilterIterator(input, cond, opt) + default: + panic(fmt.Sprintf("unsupported filter iterator type: %T", input)) + } +} + // NewDedupeIterator returns an iterator that only outputs unique points. // This iterator maintains a serialized copy of each row so it is inefficient // to use on large datasets. It is intended for small datasets such as meta queries. diff --git a/influxql/select.go b/influxql/select.go index a0a0c9be8a..2dc81641d9 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -208,6 +208,10 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, sources Sources, opt I // Construct the iterators for the subquery. input := NewIteratorMapper(itrs, indexes, opt) + // If there is a condition, filter it now. + if opt.Condition != nil { + input = NewFilterIterator(input, opt.Condition, opt) + } inputs = append(inputs, input) } } @@ -480,7 +484,15 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error } } } - return buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, false) + itr, err := buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, false) + if err != nil { + return nil, err + } + + if b.opt.Condition != nil { + itr = NewFilterIterator(itr, b.opt.Condition, subOpt) + } + return itr, nil } switch e := e.(type) { @@ -534,6 +546,11 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error return nil, err } + // Filter the iterator. + if b.opt.Condition != nil { + input = NewFilterIterator(input, b.opt.Condition, subOpt) + } + // Create an auxiliary iterator. aitr := NewAuxIterator(input, subOpt) itr := aitr.Iterator(e.Val, e.Type) @@ -574,7 +591,15 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error // Check if this is a selector or not and // create the iterator directly. selector := len(info.calls) == 1 && IsSelector(e) - return buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, selector) + itr, err := buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, selector) + if err != nil { + return nil, err + } + + if b.opt.Condition != nil { + itr = NewFilterIterator(itr, b.opt.Condition, subOpt) + } + return itr, nil case *BinaryExpr: // Retrieve the calls and references for this binary expression. // There should be no mixing of calls and refs. @@ -785,6 +810,10 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { return err } + if b.opt.Condition != nil { + input = NewFilterIterator(input, b.opt.Condition, b.opt) + } + // Wrap the result in a call iterator. i, err := NewCallIterator(input, b.opt) if err != nil {