Support the WHERE clause in outer queries with subqueries
parent
f199c50d25
commit
6cd5b690d1
|
@ -4710,6 +4710,16 @@ func TestServer_Query_Subqueries(t *testing.T) {
|
|||
command: `SELECT min(value) FROM (SELECT top(usage_user, 2), usage_user - usage_system AS value FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' GROUP BY host`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-10]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT min(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND host = 'server01'`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:00Z",40]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT value FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND value > 0`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",40]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
|
|
|
@ -2403,6 +2403,65 @@ type floatDedupeIterator struct {
|
|||
m map[string]struct{} // lookup of points already sent
|
||||
}
|
||||
|
||||
type floatFilterIterator struct {
|
||||
input FloatIterator
|
||||
cond Expr
|
||||
opt IteratorOptions
|
||||
m map[string]interface{}
|
||||
}
|
||||
|
||||
func newFloatFilterIterator(input FloatIterator, cond Expr, opt IteratorOptions) FloatIterator {
|
||||
// Strip out time conditions from the WHERE clause.
|
||||
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
|
||||
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
|
||||
switch n := n.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.LHS.String() == "time" {
|
||||
return &BooleanLiteral{Val: true}
|
||||
}
|
||||
}
|
||||
return n
|
||||
})
|
||||
|
||||
cond, _ = n.(Expr)
|
||||
if cond == nil {
|
||||
return input
|
||||
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
|
||||
return input
|
||||
}
|
||||
|
||||
return &floatFilterIterator{
|
||||
input: input,
|
||||
cond: cond,
|
||||
opt: opt,
|
||||
m: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *floatFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
|
||||
func (itr *floatFilterIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *floatFilterIterator) Next() (*FloatPoint, error) {
|
||||
for {
|
||||
p, err := itr.input.Next()
|
||||
if err != nil || p == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, ref := range itr.opt.Aux {
|
||||
itr.m[ref.Val] = p.Aux[i]
|
||||
}
|
||||
for k, v := range p.Tags.KeyValues() {
|
||||
itr.m[k] = v
|
||||
}
|
||||
|
||||
if !EvalBool(itr.cond, itr.m) {
|
||||
continue
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
// newFloatDedupeIterator returns a new instance of floatDedupeIterator.
|
||||
func newFloatDedupeIterator(input FloatIterator) *floatDedupeIterator {
|
||||
return &floatDedupeIterator{
|
||||
|
@ -4860,6 +4919,65 @@ type integerDedupeIterator struct {
|
|||
m map[string]struct{} // lookup of points already sent
|
||||
}
|
||||
|
||||
type integerFilterIterator struct {
|
||||
input IntegerIterator
|
||||
cond Expr
|
||||
opt IteratorOptions
|
||||
m map[string]interface{}
|
||||
}
|
||||
|
||||
func newIntegerFilterIterator(input IntegerIterator, cond Expr, opt IteratorOptions) IntegerIterator {
|
||||
// Strip out time conditions from the WHERE clause.
|
||||
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
|
||||
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
|
||||
switch n := n.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.LHS.String() == "time" {
|
||||
return &BooleanLiteral{Val: true}
|
||||
}
|
||||
}
|
||||
return n
|
||||
})
|
||||
|
||||
cond, _ = n.(Expr)
|
||||
if cond == nil {
|
||||
return input
|
||||
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
|
||||
return input
|
||||
}
|
||||
|
||||
return &integerFilterIterator{
|
||||
input: input,
|
||||
cond: cond,
|
||||
opt: opt,
|
||||
m: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *integerFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
|
||||
func (itr *integerFilterIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *integerFilterIterator) Next() (*IntegerPoint, error) {
|
||||
for {
|
||||
p, err := itr.input.Next()
|
||||
if err != nil || p == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, ref := range itr.opt.Aux {
|
||||
itr.m[ref.Val] = p.Aux[i]
|
||||
}
|
||||
for k, v := range p.Tags.KeyValues() {
|
||||
itr.m[k] = v
|
||||
}
|
||||
|
||||
if !EvalBool(itr.cond, itr.m) {
|
||||
continue
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
// newIntegerDedupeIterator returns a new instance of integerDedupeIterator.
|
||||
func newIntegerDedupeIterator(input IntegerIterator) *integerDedupeIterator {
|
||||
return &integerDedupeIterator{
|
||||
|
@ -7302,6 +7420,65 @@ type stringDedupeIterator struct {
|
|||
m map[string]struct{} // lookup of points already sent
|
||||
}
|
||||
|
||||
type stringFilterIterator struct {
|
||||
input StringIterator
|
||||
cond Expr
|
||||
opt IteratorOptions
|
||||
m map[string]interface{}
|
||||
}
|
||||
|
||||
func newStringFilterIterator(input StringIterator, cond Expr, opt IteratorOptions) StringIterator {
|
||||
// Strip out time conditions from the WHERE clause.
|
||||
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
|
||||
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
|
||||
switch n := n.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.LHS.String() == "time" {
|
||||
return &BooleanLiteral{Val: true}
|
||||
}
|
||||
}
|
||||
return n
|
||||
})
|
||||
|
||||
cond, _ = n.(Expr)
|
||||
if cond == nil {
|
||||
return input
|
||||
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
|
||||
return input
|
||||
}
|
||||
|
||||
return &stringFilterIterator{
|
||||
input: input,
|
||||
cond: cond,
|
||||
opt: opt,
|
||||
m: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *stringFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
|
||||
func (itr *stringFilterIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *stringFilterIterator) Next() (*StringPoint, error) {
|
||||
for {
|
||||
p, err := itr.input.Next()
|
||||
if err != nil || p == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, ref := range itr.opt.Aux {
|
||||
itr.m[ref.Val] = p.Aux[i]
|
||||
}
|
||||
for k, v := range p.Tags.KeyValues() {
|
||||
itr.m[k] = v
|
||||
}
|
||||
|
||||
if !EvalBool(itr.cond, itr.m) {
|
||||
continue
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
// newStringDedupeIterator returns a new instance of stringDedupeIterator.
|
||||
func newStringDedupeIterator(input StringIterator) *stringDedupeIterator {
|
||||
return &stringDedupeIterator{
|
||||
|
@ -9744,6 +9921,65 @@ type booleanDedupeIterator struct {
|
|||
m map[string]struct{} // lookup of points already sent
|
||||
}
|
||||
|
||||
type booleanFilterIterator struct {
|
||||
input BooleanIterator
|
||||
cond Expr
|
||||
opt IteratorOptions
|
||||
m map[string]interface{}
|
||||
}
|
||||
|
||||
func newBooleanFilterIterator(input BooleanIterator, cond Expr, opt IteratorOptions) BooleanIterator {
|
||||
// Strip out time conditions from the WHERE clause.
|
||||
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
|
||||
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
|
||||
switch n := n.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.LHS.String() == "time" {
|
||||
return &BooleanLiteral{Val: true}
|
||||
}
|
||||
}
|
||||
return n
|
||||
})
|
||||
|
||||
cond, _ = n.(Expr)
|
||||
if cond == nil {
|
||||
return input
|
||||
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
|
||||
return input
|
||||
}
|
||||
|
||||
return &booleanFilterIterator{
|
||||
input: input,
|
||||
cond: cond,
|
||||
opt: opt,
|
||||
m: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *booleanFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
|
||||
func (itr *booleanFilterIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *booleanFilterIterator) Next() (*BooleanPoint, error) {
|
||||
for {
|
||||
p, err := itr.input.Next()
|
||||
if err != nil || p == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, ref := range itr.opt.Aux {
|
||||
itr.m[ref.Val] = p.Aux[i]
|
||||
}
|
||||
for k, v := range p.Tags.KeyValues() {
|
||||
itr.m[k] = v
|
||||
}
|
||||
|
||||
if !EvalBool(itr.cond, itr.m) {
|
||||
continue
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
// newBooleanDedupeIterator returns a new instance of booleanDedupeIterator.
|
||||
func newBooleanDedupeIterator(input BooleanIterator) *booleanDedupeIterator {
|
||||
return &booleanDedupeIterator{
|
||||
|
|
|
@ -1404,6 +1404,65 @@ type {{$k.name}}DedupeIterator struct {
|
|||
m map[string]struct{} // lookup of points already sent
|
||||
}
|
||||
|
||||
type {{$k.name}}FilterIterator struct {
|
||||
input {{$k.Name}}Iterator
|
||||
cond Expr
|
||||
opt IteratorOptions
|
||||
m map[string]interface{}
|
||||
}
|
||||
|
||||
func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond Expr, opt IteratorOptions) {{$k.Name}}Iterator {
|
||||
// Strip out time conditions from the WHERE clause.
|
||||
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
|
||||
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
|
||||
switch n := n.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.LHS.String() == "time" {
|
||||
return &BooleanLiteral{Val: true}
|
||||
}
|
||||
}
|
||||
return n
|
||||
})
|
||||
|
||||
cond, _ = n.(Expr)
|
||||
if cond == nil {
|
||||
return input
|
||||
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
|
||||
return input
|
||||
}
|
||||
|
||||
return &{{$k.name}}FilterIterator{
|
||||
input: input,
|
||||
cond: cond,
|
||||
opt: opt,
|
||||
m: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() }
|
||||
func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() }
|
||||
|
||||
func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) {
|
||||
for {
|
||||
p, err := itr.input.Next()
|
||||
if err != nil || p == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, ref := range itr.opt.Aux {
|
||||
itr.m[ref.Val] = p.Aux[i]
|
||||
}
|
||||
for k, v := range p.Tags.KeyValues() {
|
||||
itr.m[k] = v
|
||||
}
|
||||
|
||||
if !EvalBool(itr.cond, itr.m) {
|
||||
continue
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
|
||||
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
|
||||
return &{{$k.name}}DedupeIterator{
|
||||
|
|
|
@ -281,6 +281,28 @@ func NewLimitIterator(input Iterator, opt IteratorOptions) Iterator {
|
|||
}
|
||||
}
|
||||
|
||||
// NewFilterIterator returns an iterator that filters the points based on the
|
||||
// condition. This iterator is not nearly as efficient as filtering points
|
||||
// within the query engine and is only used when filtering subqueries.
|
||||
func NewFilterIterator(input Iterator, cond Expr, opt IteratorOptions) Iterator {
|
||||
if input == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch input := input.(type) {
|
||||
case FloatIterator:
|
||||
return newFloatFilterIterator(input, cond, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerFilterIterator(input, cond, opt)
|
||||
case StringIterator:
|
||||
return newStringFilterIterator(input, cond, opt)
|
||||
case BooleanIterator:
|
||||
return newBooleanFilterIterator(input, cond, opt)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported filter iterator type: %T", input))
|
||||
}
|
||||
}
|
||||
|
||||
// NewDedupeIterator returns an iterator that only outputs unique points.
|
||||
// This iterator maintains a serialized copy of each row so it is inefficient
|
||||
// to use on large datasets. It is intended for small datasets such as meta queries.
|
||||
|
|
|
@ -208,6 +208,10 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, sources Sources, opt I
|
|||
|
||||
// Construct the iterators for the subquery.
|
||||
input := NewIteratorMapper(itrs, indexes, opt)
|
||||
// If there is a condition, filter it now.
|
||||
if opt.Condition != nil {
|
||||
input = NewFilterIterator(input, opt.Condition, opt)
|
||||
}
|
||||
inputs = append(inputs, input)
|
||||
}
|
||||
}
|
||||
|
@ -480,7 +484,15 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error
|
|||
}
|
||||
}
|
||||
}
|
||||
return buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, false)
|
||||
itr, err := buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if b.opt.Condition != nil {
|
||||
itr = NewFilterIterator(itr, b.opt.Condition, subOpt)
|
||||
}
|
||||
return itr, nil
|
||||
}
|
||||
|
||||
switch e := e.(type) {
|
||||
|
@ -534,6 +546,11 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Filter the iterator.
|
||||
if b.opt.Condition != nil {
|
||||
input = NewFilterIterator(input, b.opt.Condition, subOpt)
|
||||
}
|
||||
|
||||
// Create an auxiliary iterator.
|
||||
aitr := NewAuxIterator(input, subOpt)
|
||||
itr := aitr.Iterator(e.Val, e.Type)
|
||||
|
@ -574,7 +591,15 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error
|
|||
// Check if this is a selector or not and
|
||||
// create the iterator directly.
|
||||
selector := len(info.calls) == 1 && IsSelector(e)
|
||||
return buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, selector)
|
||||
itr, err := buildExprIterator(e, b.ic, source.Statement.Sources, subOpt, selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if b.opt.Condition != nil {
|
||||
itr = NewFilterIterator(itr, b.opt.Condition, subOpt)
|
||||
}
|
||||
return itr, nil
|
||||
case *BinaryExpr:
|
||||
// Retrieve the calls and references for this binary expression.
|
||||
// There should be no mixing of calls and refs.
|
||||
|
@ -785,6 +810,10 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
|
|||
return err
|
||||
}
|
||||
|
||||
if b.opt.Condition != nil {
|
||||
input = NewFilterIterator(input, b.opt.Condition, b.opt)
|
||||
}
|
||||
|
||||
// Wrap the result in a call iterator.
|
||||
i, err := NewCallIterator(input, b.opt)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue