From 22fc9f6a19da96806a20d019ff7dc3ca12eecd5e Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 4 Oct 2018 09:59:41 -0500 Subject: [PATCH] Strip tags from a subquery when the outer query does not group by that tag The following would, erroneously, not strip the tag from the inner query: SELECT value FROM (SELECT value FROM cpu GROUP BY host) The inner query was supposed to group by the host tag, but the outer query should strip it away since it is not being grouped by anymore. This fixes things so that the result will have the tags stripped away when they are not requested in the grouping. --- CHANGELOG.md | 1 + query/iterator.gen.go | 215 ++++++++++++++++++++++++++++++++++ query/iterator.gen.go.tmpl | 43 +++++++ query/iterator.go | 23 ++++ query/iterator_mapper_test.go | 1 + query/subquery.go | 12 +- query/subquery_test.go | 31 +++++ 7 files changed, 324 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7194dbd997..32d6f9adb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ v1.7.0 [unreleased] - [#10005](https://github.com/influxdata/influxdb/pull/10005): Missing hardwareAddr in uuid v1 generation. - [#10246](https://github.com/influxdata/influxdb/pull/10246): Fix the inherited interval for derivative and others. - [#10333](https://github.com/influxdata/influxdb/pull/10333): Fix subquery functionality when a function references a tag from the subquery. +- [#10333](https://github.com/influxdata/influxdb/pull/10333): Strip tags from a subquery when the outer query does not group by that tag. v1.6.0 [2018-07-05] ------------------- diff --git a/query/iterator.gen.go b/query/iterator.gen.go index 2936b67efb..61dcabd481 100644 --- a/query/iterator.gen.go +++ b/query/iterator.gen.go @@ -2495,6 +2495,49 @@ func (itr *floatFilterIterator) Next() (*FloatPoint, error) { } } +type floatTagSubsetIterator struct { + input FloatIterator + point FloatPoint + lastTags Tags + dimensions []string +} + +func newFloatTagSubsetIterator(input FloatIterator, opt IteratorOptions) *floatTagSubsetIterator { + return &floatTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *floatTagSubsetIterator) Next() (*FloatPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *floatTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *floatTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newFloatDedupeIterator returns a new instance of floatDedupeIterator. func newFloatDedupeIterator(input FloatIterator) *floatDedupeIterator { return &floatDedupeIterator{ @@ -5051,6 +5094,49 @@ func (itr *integerFilterIterator) Next() (*IntegerPoint, error) { } } +type integerTagSubsetIterator struct { + input IntegerIterator + point IntegerPoint + lastTags Tags + dimensions []string +} + +func newIntegerTagSubsetIterator(input IntegerIterator, opt IteratorOptions) *integerTagSubsetIterator { + return &integerTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *integerTagSubsetIterator) Next() (*IntegerPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *integerTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *integerTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newIntegerDedupeIterator returns a new instance of integerDedupeIterator. func newIntegerDedupeIterator(input IntegerIterator) *integerDedupeIterator { return &integerDedupeIterator{ @@ -7607,6 +7693,49 @@ func (itr *unsignedFilterIterator) Next() (*UnsignedPoint, error) { } } +type unsignedTagSubsetIterator struct { + input UnsignedIterator + point UnsignedPoint + lastTags Tags + dimensions []string +} + +func newUnsignedTagSubsetIterator(input UnsignedIterator, opt IteratorOptions) *unsignedTagSubsetIterator { + return &unsignedTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *unsignedTagSubsetIterator) Next() (*UnsignedPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *unsignedTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *unsignedTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newUnsignedDedupeIterator returns a new instance of unsignedDedupeIterator. func newUnsignedDedupeIterator(input UnsignedIterator) *unsignedDedupeIterator { return &unsignedDedupeIterator{ @@ -10149,6 +10278,49 @@ func (itr *stringFilterIterator) Next() (*StringPoint, error) { } } +type stringTagSubsetIterator struct { + input StringIterator + point StringPoint + lastTags Tags + dimensions []string +} + +func newStringTagSubsetIterator(input StringIterator, opt IteratorOptions) *stringTagSubsetIterator { + return &stringTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *stringTagSubsetIterator) Next() (*StringPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *stringTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *stringTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newStringDedupeIterator returns a new instance of stringDedupeIterator. func newStringDedupeIterator(input StringIterator) *stringDedupeIterator { return &stringDedupeIterator{ @@ -12691,6 +12863,49 @@ func (itr *booleanFilterIterator) Next() (*BooleanPoint, error) { } } +type booleanTagSubsetIterator struct { + input BooleanIterator + point BooleanPoint + lastTags Tags + dimensions []string +} + +func newBooleanTagSubsetIterator(input BooleanIterator, opt IteratorOptions) *booleanTagSubsetIterator { + return &booleanTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *booleanTagSubsetIterator) Next() (*BooleanPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *booleanTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *booleanTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newBooleanDedupeIterator returns a new instance of booleanDedupeIterator. func newBooleanDedupeIterator(input BooleanIterator) *booleanDedupeIterator { return &booleanDedupeIterator{ diff --git a/query/iterator.gen.go.tmpl b/query/iterator.gen.go.tmpl index 8f6577667b..36ffedebdc 100644 --- a/query/iterator.gen.go.tmpl +++ b/query/iterator.gen.go.tmpl @@ -1400,6 +1400,49 @@ func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) { } } +type {{$k.name}}TagSubsetIterator struct { + input {{$k.Name}}Iterator + point {{$k.Name}}Point + lastTags Tags + dimensions []string +} + +func new{{$k.Name}}TagSubsetIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}TagSubsetIterator { + return &{{$k.name}}TagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *{{$k.name}}TagSubsetIterator) Next() (*{{$k.Name}}Point, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *{{$k.name}}TagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *{{$k.name}}TagSubsetIterator) Close() error { + return itr.input.Close() +} + // new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator. func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator { return &{{$k.name}}DedupeIterator{ diff --git a/query/iterator.go b/query/iterator.go index 708352c7ae..f45df32b01 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -305,6 +305,29 @@ func NewFilterIterator(input Iterator, cond influxql.Expr, opt IteratorOptions) } } +// NewTagSubsetIterator will strip each of the points to a subset of the tag key values +// for each point it processes. +func NewTagSubsetIterator(input Iterator, opt IteratorOptions) Iterator { + if input == nil { + return nil + } + + switch input := input.(type) { + case FloatIterator: + return newFloatTagSubsetIterator(input, opt) + case IntegerIterator: + return newIntegerTagSubsetIterator(input, opt) + case UnsignedIterator: + return newUnsignedTagSubsetIterator(input, opt) + case StringIterator: + return newStringTagSubsetIterator(input, opt) + case BooleanIterator: + return newBooleanTagSubsetIterator(input, opt) + default: + panic(fmt.Sprintf("unsupported tag subset iterator type: %T", input)) + } +} + // NewDedupeIterator returns an iterator that only outputs unique points. // This iterator maintains a serialized copy of each row so it is inefficient // to use on large datasets. It is intended for small datasets such as meta queries. diff --git a/query/iterator_mapper_test.go b/query/iterator_mapper_test.go index 79ad4fe2fe..3a06a83d23 100644 --- a/query/iterator_mapper_test.go +++ b/query/iterator_mapper_test.go @@ -54,6 +54,7 @@ func TestIteratorMapper(t *testing.T) { {Val: "val1", Type: influxql.Float}, {Val: "val2", Type: influxql.String}, }, + Dimensions: []string{"host"}, } itr := query.NewIteratorMapper(cur, nil, []query.IteratorMap{ query.FieldMap{Index: 0}, diff --git a/query/subquery.go b/query/subquery.go index d5d854e114..2bb0b25a70 100644 --- a/query/subquery.go +++ b/query/subquery.go @@ -32,7 +32,11 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti } // Construct the iterators for the subquery. - return NewIteratorMapper(cur, nil, indexes, subOpt), nil + itr := NewIteratorMapper(cur, nil, indexes, subOpt) + if len(opt.GetDimensions()) != len(subOpt.GetDimensions()) { + itr = NewTagSubsetIterator(itr, opt) + } + return itr, nil } func (b *subqueryBuilder) mapAuxFields(auxFields []influxql.VarRef) []IteratorMap { @@ -114,5 +118,9 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq } // Construct the iterators for the subquery. - return NewIteratorMapper(cur, driver, indexes, subOpt), nil + itr := NewIteratorMapper(cur, driver, indexes, subOpt) + if len(opt.GetDimensions()) != len(subOpt.GetDimensions()) { + itr = NewTagSubsetIterator(itr, opt) + } + return itr, nil } diff --git a/query/subquery_test.go b/query/subquery_test.go index 6e3f0ec6a5..0283abed95 100644 --- a/query/subquery_test.go +++ b/query/subquery_test.go @@ -193,6 +193,37 @@ func TestSubquery(t *testing.T) { {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{int64(3)}}, }, }, + { + Name: "StripTags", + Statement: `SELECT max FROM (SELECT max(value) FROM cpu GROUP BY host) WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:15Z'`, + Fields: map[string]influxql.DataType{"value": influxql.Float}, + MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn { + if got, want := tr.MinTimeNano(), 0*Second; got != want { + t.Errorf("unexpected min time: got=%d want=%d", got, want) + } + if got, want := tr.MaxTimeNano(), 15*Second-1; got != want { + t.Errorf("unexpected max time: got=%d want=%d", got, want) + } + return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator { + if got, want := m.Name, "cpu"; got != want { + t.Errorf("unexpected source: got=%s want=%s", got, want) + } + if got, want := opt.Expr.String(), "max(value::float)"; got != want { + t.Errorf("unexpected expression: got=%s want=%s", got, want) + } + return &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Tags: ParseTags("host=server01"), Value: 5}, + {Name: "cpu", Tags: ParseTags("host=server02"), Value: 3}, + {Name: "cpu", Tags: ParseTags("host=server03"), Value: 8}, + }} + } + }, + Rows: []query.Row{ + {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{5.0}}, + {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{3.0}}, + {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{8.0}}, + }, + }, } { t.Run(test.Name, func(t *testing.T) { shardMapper := ShardMapper{