diff --git a/CHANGELOG.md b/CHANGELOG.md index 7194dbd997..32d6f9adb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ v1.7.0 [unreleased] - [#10005](https://github.com/influxdata/influxdb/pull/10005): Missing hardwareAddr in uuid v1 generation. - [#10246](https://github.com/influxdata/influxdb/pull/10246): Fix the inherited interval for derivative and others. - [#10333](https://github.com/influxdata/influxdb/pull/10333): Fix subquery functionality when a function references a tag from the subquery. +- [#10333](https://github.com/influxdata/influxdb/pull/10333): Strip tags from a subquery when the outer query does not group by that tag. v1.6.0 [2018-07-05] ------------------- diff --git a/query/iterator.gen.go b/query/iterator.gen.go index 2936b67efb..61dcabd481 100644 --- a/query/iterator.gen.go +++ b/query/iterator.gen.go @@ -2495,6 +2495,49 @@ func (itr *floatFilterIterator) Next() (*FloatPoint, error) { } } +type floatTagSubsetIterator struct { + input FloatIterator + point FloatPoint + lastTags Tags + dimensions []string +} + +func newFloatTagSubsetIterator(input FloatIterator, opt IteratorOptions) *floatTagSubsetIterator { + return &floatTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *floatTagSubsetIterator) Next() (*FloatPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *floatTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *floatTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newFloatDedupeIterator returns a new instance of floatDedupeIterator. func newFloatDedupeIterator(input FloatIterator) *floatDedupeIterator { return &floatDedupeIterator{ @@ -5051,6 +5094,49 @@ func (itr *integerFilterIterator) Next() (*IntegerPoint, error) { } } +type integerTagSubsetIterator struct { + input IntegerIterator + point IntegerPoint + lastTags Tags + dimensions []string +} + +func newIntegerTagSubsetIterator(input IntegerIterator, opt IteratorOptions) *integerTagSubsetIterator { + return &integerTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *integerTagSubsetIterator) Next() (*IntegerPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *integerTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *integerTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newIntegerDedupeIterator returns a new instance of integerDedupeIterator. func newIntegerDedupeIterator(input IntegerIterator) *integerDedupeIterator { return &integerDedupeIterator{ @@ -7607,6 +7693,49 @@ func (itr *unsignedFilterIterator) Next() (*UnsignedPoint, error) { } } +type unsignedTagSubsetIterator struct { + input UnsignedIterator + point UnsignedPoint + lastTags Tags + dimensions []string +} + +func newUnsignedTagSubsetIterator(input UnsignedIterator, opt IteratorOptions) *unsignedTagSubsetIterator { + return &unsignedTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *unsignedTagSubsetIterator) Next() (*UnsignedPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *unsignedTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *unsignedTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newUnsignedDedupeIterator returns a new instance of unsignedDedupeIterator. func newUnsignedDedupeIterator(input UnsignedIterator) *unsignedDedupeIterator { return &unsignedDedupeIterator{ @@ -10149,6 +10278,49 @@ func (itr *stringFilterIterator) Next() (*StringPoint, error) { } } +type stringTagSubsetIterator struct { + input StringIterator + point StringPoint + lastTags Tags + dimensions []string +} + +func newStringTagSubsetIterator(input StringIterator, opt IteratorOptions) *stringTagSubsetIterator { + return &stringTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *stringTagSubsetIterator) Next() (*StringPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *stringTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *stringTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newStringDedupeIterator returns a new instance of stringDedupeIterator. func newStringDedupeIterator(input StringIterator) *stringDedupeIterator { return &stringDedupeIterator{ @@ -12691,6 +12863,49 @@ func (itr *booleanFilterIterator) Next() (*BooleanPoint, error) { } } +type booleanTagSubsetIterator struct { + input BooleanIterator + point BooleanPoint + lastTags Tags + dimensions []string +} + +func newBooleanTagSubsetIterator(input BooleanIterator, opt IteratorOptions) *booleanTagSubsetIterator { + return &booleanTagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *booleanTagSubsetIterator) Next() (*BooleanPoint, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *booleanTagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *booleanTagSubsetIterator) Close() error { + return itr.input.Close() +} + // newBooleanDedupeIterator returns a new instance of booleanDedupeIterator. func newBooleanDedupeIterator(input BooleanIterator) *booleanDedupeIterator { return &booleanDedupeIterator{ diff --git a/query/iterator.gen.go.tmpl b/query/iterator.gen.go.tmpl index 8f6577667b..36ffedebdc 100644 --- a/query/iterator.gen.go.tmpl +++ b/query/iterator.gen.go.tmpl @@ -1400,6 +1400,49 @@ func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) { } } +type {{$k.name}}TagSubsetIterator struct { + input {{$k.Name}}Iterator + point {{$k.Name}}Point + lastTags Tags + dimensions []string +} + +func new{{$k.Name}}TagSubsetIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}TagSubsetIterator { + return &{{$k.name}}TagSubsetIterator{ + input: input, + dimensions: opt.GetDimensions(), + } +} + +func (itr *{{$k.name}}TagSubsetIterator) Next() (*{{$k.Name}}Point, error) { + p, err := itr.input.Next() + if err != nil { + return nil, err + } else if p == nil { + return nil, nil + } + + itr.point.Name = p.Name + if !p.Tags.Equal(itr.lastTags) { + itr.point.Tags = p.Tags.Subset(itr.dimensions) + itr.lastTags = p.Tags + } + itr.point.Time = p.Time + itr.point.Value = p.Value + itr.point.Aux = p.Aux + itr.point.Aggregated = p.Aggregated + itr.point.Nil = p.Nil + return &itr.point, nil +} + +func (itr *{{$k.name}}TagSubsetIterator) Stats() IteratorStats { + return itr.input.Stats() +} + +func (itr *{{$k.name}}TagSubsetIterator) Close() error { + return itr.input.Close() +} + // new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator. func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator { return &{{$k.name}}DedupeIterator{ diff --git a/query/iterator.go b/query/iterator.go index 708352c7ae..f45df32b01 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -305,6 +305,29 @@ func NewFilterIterator(input Iterator, cond influxql.Expr, opt IteratorOptions) } } +// NewTagSubsetIterator will strip each of the points to a subset of the tag key values +// for each point it processes. +func NewTagSubsetIterator(input Iterator, opt IteratorOptions) Iterator { + if input == nil { + return nil + } + + switch input := input.(type) { + case FloatIterator: + return newFloatTagSubsetIterator(input, opt) + case IntegerIterator: + return newIntegerTagSubsetIterator(input, opt) + case UnsignedIterator: + return newUnsignedTagSubsetIterator(input, opt) + case StringIterator: + return newStringTagSubsetIterator(input, opt) + case BooleanIterator: + return newBooleanTagSubsetIterator(input, opt) + default: + panic(fmt.Sprintf("unsupported tag subset iterator type: %T", input)) + } +} + // NewDedupeIterator returns an iterator that only outputs unique points. // This iterator maintains a serialized copy of each row so it is inefficient // to use on large datasets. It is intended for small datasets such as meta queries. diff --git a/query/iterator_mapper_test.go b/query/iterator_mapper_test.go index 79ad4fe2fe..3a06a83d23 100644 --- a/query/iterator_mapper_test.go +++ b/query/iterator_mapper_test.go @@ -54,6 +54,7 @@ func TestIteratorMapper(t *testing.T) { {Val: "val1", Type: influxql.Float}, {Val: "val2", Type: influxql.String}, }, + Dimensions: []string{"host"}, } itr := query.NewIteratorMapper(cur, nil, []query.IteratorMap{ query.FieldMap{Index: 0}, diff --git a/query/subquery.go b/query/subquery.go index d5d854e114..2bb0b25a70 100644 --- a/query/subquery.go +++ b/query/subquery.go @@ -32,7 +32,11 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti } // Construct the iterators for the subquery. - return NewIteratorMapper(cur, nil, indexes, subOpt), nil + itr := NewIteratorMapper(cur, nil, indexes, subOpt) + if len(opt.GetDimensions()) != len(subOpt.GetDimensions()) { + itr = NewTagSubsetIterator(itr, opt) + } + return itr, nil } func (b *subqueryBuilder) mapAuxFields(auxFields []influxql.VarRef) []IteratorMap { @@ -114,5 +118,9 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq } // Construct the iterators for the subquery. - return NewIteratorMapper(cur, driver, indexes, subOpt), nil + itr := NewIteratorMapper(cur, driver, indexes, subOpt) + if len(opt.GetDimensions()) != len(subOpt.GetDimensions()) { + itr = NewTagSubsetIterator(itr, opt) + } + return itr, nil } diff --git a/query/subquery_test.go b/query/subquery_test.go index 6e3f0ec6a5..0283abed95 100644 --- a/query/subquery_test.go +++ b/query/subquery_test.go @@ -193,6 +193,37 @@ func TestSubquery(t *testing.T) { {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{int64(3)}}, }, }, + { + Name: "StripTags", + Statement: `SELECT max FROM (SELECT max(value) FROM cpu GROUP BY host) WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:15Z'`, + Fields: map[string]influxql.DataType{"value": influxql.Float}, + MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn { + if got, want := tr.MinTimeNano(), 0*Second; got != want { + t.Errorf("unexpected min time: got=%d want=%d", got, want) + } + if got, want := tr.MaxTimeNano(), 15*Second-1; got != want { + t.Errorf("unexpected max time: got=%d want=%d", got, want) + } + return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator { + if got, want := m.Name, "cpu"; got != want { + t.Errorf("unexpected source: got=%s want=%s", got, want) + } + if got, want := opt.Expr.String(), "max(value::float)"; got != want { + t.Errorf("unexpected expression: got=%s want=%s", got, want) + } + return &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Tags: ParseTags("host=server01"), Value: 5}, + {Name: "cpu", Tags: ParseTags("host=server02"), Value: 3}, + {Name: "cpu", Tags: ParseTags("host=server03"), Value: 8}, + }} + } + }, + Rows: []query.Row{ + {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{5.0}}, + {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{3.0}}, + {Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{8.0}}, + }, + }, } { t.Run(test.Name, func(t *testing.T) { shardMapper := ShardMapper{