Strip tags from a subquery when the outer query does not group by that tag

The following would, erroneously, not strip the tag from the inner
query:

    SELECT value FROM (SELECT value FROM cpu GROUP BY host)

The inner query was supposed to group by the host tag, but the outer
query should strip it away since it is not being grouped by anymore.
This fixes things so that the result will have the tags stripped away
when they are not requested in the grouping.
pull/10333/head
Jonathan A. Sternberg 2018-10-04 09:59:41 -05:00
parent 634471f12e
commit 22fc9f6a19
No known key found for this signature in database
GPG Key ID: 4A0C1200CB8B9D2E
7 changed files with 324 additions and 2 deletions

View File

@ -15,6 +15,7 @@ v1.7.0 [unreleased]
- [#10005](https://github.com/influxdata/influxdb/pull/10005): Missing hardwareAddr in uuid v1 generation.
- [#10246](https://github.com/influxdata/influxdb/pull/10246): Fix the inherited interval for derivative and others.
- [#10333](https://github.com/influxdata/influxdb/pull/10333): Fix subquery functionality when a function references a tag from the subquery.
- [#10333](https://github.com/influxdata/influxdb/pull/10333): Strip tags from a subquery when the outer query does not group by that tag.
v1.6.0 [2018-07-05]
-------------------

View File

@ -2495,6 +2495,49 @@ func (itr *floatFilterIterator) Next() (*FloatPoint, error) {
}
}
type floatTagSubsetIterator struct {
input FloatIterator
point FloatPoint
lastTags Tags
dimensions []string
}
func newFloatTagSubsetIterator(input FloatIterator, opt IteratorOptions) *floatTagSubsetIterator {
return &floatTagSubsetIterator{
input: input,
dimensions: opt.GetDimensions(),
}
}
func (itr *floatTagSubsetIterator) Next() (*FloatPoint, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p == nil {
return nil, nil
}
itr.point.Name = p.Name
if !p.Tags.Equal(itr.lastTags) {
itr.point.Tags = p.Tags.Subset(itr.dimensions)
itr.lastTags = p.Tags
}
itr.point.Time = p.Time
itr.point.Value = p.Value
itr.point.Aux = p.Aux
itr.point.Aggregated = p.Aggregated
itr.point.Nil = p.Nil
return &itr.point, nil
}
func (itr *floatTagSubsetIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *floatTagSubsetIterator) Close() error {
return itr.input.Close()
}
// newFloatDedupeIterator returns a new instance of floatDedupeIterator.
func newFloatDedupeIterator(input FloatIterator) *floatDedupeIterator {
return &floatDedupeIterator{
@ -5051,6 +5094,49 @@ func (itr *integerFilterIterator) Next() (*IntegerPoint, error) {
}
}
type integerTagSubsetIterator struct {
input IntegerIterator
point IntegerPoint
lastTags Tags
dimensions []string
}
func newIntegerTagSubsetIterator(input IntegerIterator, opt IteratorOptions) *integerTagSubsetIterator {
return &integerTagSubsetIterator{
input: input,
dimensions: opt.GetDimensions(),
}
}
func (itr *integerTagSubsetIterator) Next() (*IntegerPoint, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p == nil {
return nil, nil
}
itr.point.Name = p.Name
if !p.Tags.Equal(itr.lastTags) {
itr.point.Tags = p.Tags.Subset(itr.dimensions)
itr.lastTags = p.Tags
}
itr.point.Time = p.Time
itr.point.Value = p.Value
itr.point.Aux = p.Aux
itr.point.Aggregated = p.Aggregated
itr.point.Nil = p.Nil
return &itr.point, nil
}
func (itr *integerTagSubsetIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *integerTagSubsetIterator) Close() error {
return itr.input.Close()
}
// newIntegerDedupeIterator returns a new instance of integerDedupeIterator.
func newIntegerDedupeIterator(input IntegerIterator) *integerDedupeIterator {
return &integerDedupeIterator{
@ -7607,6 +7693,49 @@ func (itr *unsignedFilterIterator) Next() (*UnsignedPoint, error) {
}
}
type unsignedTagSubsetIterator struct {
input UnsignedIterator
point UnsignedPoint
lastTags Tags
dimensions []string
}
func newUnsignedTagSubsetIterator(input UnsignedIterator, opt IteratorOptions) *unsignedTagSubsetIterator {
return &unsignedTagSubsetIterator{
input: input,
dimensions: opt.GetDimensions(),
}
}
func (itr *unsignedTagSubsetIterator) Next() (*UnsignedPoint, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p == nil {
return nil, nil
}
itr.point.Name = p.Name
if !p.Tags.Equal(itr.lastTags) {
itr.point.Tags = p.Tags.Subset(itr.dimensions)
itr.lastTags = p.Tags
}
itr.point.Time = p.Time
itr.point.Value = p.Value
itr.point.Aux = p.Aux
itr.point.Aggregated = p.Aggregated
itr.point.Nil = p.Nil
return &itr.point, nil
}
func (itr *unsignedTagSubsetIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *unsignedTagSubsetIterator) Close() error {
return itr.input.Close()
}
// newUnsignedDedupeIterator returns a new instance of unsignedDedupeIterator.
func newUnsignedDedupeIterator(input UnsignedIterator) *unsignedDedupeIterator {
return &unsignedDedupeIterator{
@ -10149,6 +10278,49 @@ func (itr *stringFilterIterator) Next() (*StringPoint, error) {
}
}
type stringTagSubsetIterator struct {
input StringIterator
point StringPoint
lastTags Tags
dimensions []string
}
func newStringTagSubsetIterator(input StringIterator, opt IteratorOptions) *stringTagSubsetIterator {
return &stringTagSubsetIterator{
input: input,
dimensions: opt.GetDimensions(),
}
}
func (itr *stringTagSubsetIterator) Next() (*StringPoint, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p == nil {
return nil, nil
}
itr.point.Name = p.Name
if !p.Tags.Equal(itr.lastTags) {
itr.point.Tags = p.Tags.Subset(itr.dimensions)
itr.lastTags = p.Tags
}
itr.point.Time = p.Time
itr.point.Value = p.Value
itr.point.Aux = p.Aux
itr.point.Aggregated = p.Aggregated
itr.point.Nil = p.Nil
return &itr.point, nil
}
func (itr *stringTagSubsetIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *stringTagSubsetIterator) Close() error {
return itr.input.Close()
}
// newStringDedupeIterator returns a new instance of stringDedupeIterator.
func newStringDedupeIterator(input StringIterator) *stringDedupeIterator {
return &stringDedupeIterator{
@ -12691,6 +12863,49 @@ func (itr *booleanFilterIterator) Next() (*BooleanPoint, error) {
}
}
type booleanTagSubsetIterator struct {
input BooleanIterator
point BooleanPoint
lastTags Tags
dimensions []string
}
func newBooleanTagSubsetIterator(input BooleanIterator, opt IteratorOptions) *booleanTagSubsetIterator {
return &booleanTagSubsetIterator{
input: input,
dimensions: opt.GetDimensions(),
}
}
func (itr *booleanTagSubsetIterator) Next() (*BooleanPoint, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p == nil {
return nil, nil
}
itr.point.Name = p.Name
if !p.Tags.Equal(itr.lastTags) {
itr.point.Tags = p.Tags.Subset(itr.dimensions)
itr.lastTags = p.Tags
}
itr.point.Time = p.Time
itr.point.Value = p.Value
itr.point.Aux = p.Aux
itr.point.Aggregated = p.Aggregated
itr.point.Nil = p.Nil
return &itr.point, nil
}
func (itr *booleanTagSubsetIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *booleanTagSubsetIterator) Close() error {
return itr.input.Close()
}
// newBooleanDedupeIterator returns a new instance of booleanDedupeIterator.
func newBooleanDedupeIterator(input BooleanIterator) *booleanDedupeIterator {
return &booleanDedupeIterator{

View File

@ -1400,6 +1400,49 @@ func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) {
}
}
type {{$k.name}}TagSubsetIterator struct {
input {{$k.Name}}Iterator
point {{$k.Name}}Point
lastTags Tags
dimensions []string
}
func new{{$k.Name}}TagSubsetIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}TagSubsetIterator {
return &{{$k.name}}TagSubsetIterator{
input: input,
dimensions: opt.GetDimensions(),
}
}
func (itr *{{$k.name}}TagSubsetIterator) Next() (*{{$k.Name}}Point, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p == nil {
return nil, nil
}
itr.point.Name = p.Name
if !p.Tags.Equal(itr.lastTags) {
itr.point.Tags = p.Tags.Subset(itr.dimensions)
itr.lastTags = p.Tags
}
itr.point.Time = p.Time
itr.point.Value = p.Value
itr.point.Aux = p.Aux
itr.point.Aggregated = p.Aggregated
itr.point.Nil = p.Nil
return &itr.point, nil
}
func (itr *{{$k.name}}TagSubsetIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *{{$k.name}}TagSubsetIterator) Close() error {
return itr.input.Close()
}
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
return &{{$k.name}}DedupeIterator{

View File

@ -305,6 +305,29 @@ func NewFilterIterator(input Iterator, cond influxql.Expr, opt IteratorOptions)
}
}
// NewTagSubsetIterator will strip each of the points to a subset of the tag key values
// for each point it processes.
func NewTagSubsetIterator(input Iterator, opt IteratorOptions) Iterator {
if input == nil {
return nil
}
switch input := input.(type) {
case FloatIterator:
return newFloatTagSubsetIterator(input, opt)
case IntegerIterator:
return newIntegerTagSubsetIterator(input, opt)
case UnsignedIterator:
return newUnsignedTagSubsetIterator(input, opt)
case StringIterator:
return newStringTagSubsetIterator(input, opt)
case BooleanIterator:
return newBooleanTagSubsetIterator(input, opt)
default:
panic(fmt.Sprintf("unsupported tag subset iterator type: %T", input))
}
}
// NewDedupeIterator returns an iterator that only outputs unique points.
// This iterator maintains a serialized copy of each row so it is inefficient
// to use on large datasets. It is intended for small datasets such as meta queries.

View File

@ -54,6 +54,7 @@ func TestIteratorMapper(t *testing.T) {
{Val: "val1", Type: influxql.Float},
{Val: "val2", Type: influxql.String},
},
Dimensions: []string{"host"},
}
itr := query.NewIteratorMapper(cur, nil, []query.IteratorMap{
query.FieldMap{Index: 0},

View File

@ -32,7 +32,11 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti
}
// Construct the iterators for the subquery.
return NewIteratorMapper(cur, nil, indexes, subOpt), nil
itr := NewIteratorMapper(cur, nil, indexes, subOpt)
if len(opt.GetDimensions()) != len(subOpt.GetDimensions()) {
itr = NewTagSubsetIterator(itr, opt)
}
return itr, nil
}
func (b *subqueryBuilder) mapAuxFields(auxFields []influxql.VarRef) []IteratorMap {
@ -114,5 +118,9 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq
}
// Construct the iterators for the subquery.
return NewIteratorMapper(cur, driver, indexes, subOpt), nil
itr := NewIteratorMapper(cur, driver, indexes, subOpt)
if len(opt.GetDimensions()) != len(subOpt.GetDimensions()) {
itr = NewTagSubsetIterator(itr, opt)
}
return itr, nil
}

View File

@ -193,6 +193,37 @@ func TestSubquery(t *testing.T) {
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{int64(3)}},
},
},
{
Name: "StripTags",
Statement: `SELECT max FROM (SELECT max(value) FROM cpu GROUP BY host) WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:15Z'`,
Fields: map[string]influxql.DataType{"value": influxql.Float},
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
if got, want := tr.MinTimeNano(), 0*Second; got != want {
t.Errorf("unexpected min time: got=%d want=%d", got, want)
}
if got, want := tr.MaxTimeNano(), 15*Second-1; got != want {
t.Errorf("unexpected max time: got=%d want=%d", got, want)
}
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
if got, want := m.Name, "cpu"; got != want {
t.Errorf("unexpected source: got=%s want=%s", got, want)
}
if got, want := opt.Expr.String(), "max(value::float)"; got != want {
t.Errorf("unexpected expression: got=%s want=%s", got, want)
}
return &FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=server01"), Value: 5},
{Name: "cpu", Tags: ParseTags("host=server02"), Value: 3},
{Name: "cpu", Tags: ParseTags("host=server03"), Value: 8},
}}
}
},
Rows: []query.Row{
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{5.0}},
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{3.0}},
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{8.0}},
},
},
} {
t.Run(test.Name, func(t *testing.T) {
shardMapper := ShardMapper{