Merge pull request #14357 from influxdata/fix/reduce-sort-order

Subquery ordering with aggregates in descending mode was wrong
pull/14421/head
Jonathan A. Sternberg 2019-07-19 16:24:28 -05:00 committed by GitHub
commit 171ea40c08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 624 additions and 288 deletions

File diff suppressed because it is too large Load Diff

View File

@ -450,7 +450,7 @@ func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
if x.Time != y.Time{
return x.Time > y.Time
}
@ -1118,28 +1118,28 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, e
rp.Aggregator.Aggregate{{$k.Name}}(curr)
}
// Reverse sort points by name & tag if our output is supposed to be ordered.
// This is because the final array needs to have the first
// point at the end since they are popped off of the end.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
if len(keys) > 1 && itr.opt.Ordered {
sort.Sort(reverseStringSlice(keys))
// Reverse sort points by name & tag.
// This ensures a consistent order of output.
if len(keys) > 0 {
var sorted sort.Interface = sort.StringSlice(keys)
if itr.opt.Ascending {
sorted = sort.Reverse(sorted)
}
sort.Sort(sorted)
}
// Assume the points are already sorted until proven otherwise.
sortedByTime := true
// Emit the points for each name & tag combination.
a := make([]{{$v.Name}}Point, 0, len(m))
for _, k := range keys {
rp := m[k]
points := rp.Emitter.Emit()
// If the points are meant to be ordered,
// ensure that is done here. The tags are already
// sorted and should remain sorted.
if len(points) > 1 && itr.opt.Ordered {
sort.Stable({{$v.name}}PointsByTime(points))
}
for i := len(points)-1; i >= 0; i-- {
points[i].Name = rp.Name
if !itr.keepTags {
@ -1148,10 +1148,20 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, e
// Set the points time to the interval time if the reducer didn't provide one.
if points[i].Time == ZeroTime {
points[i].Time = startTime
} else {
sortedByTime = false
}
a = append(a, points[i])
}
}
// Points may be out of order. Perform a stable sort by time if requested.
if !sortedByTime && itr.opt.Ordered {
var sorted sort.Interface = {{$v.name}}PointsByTime(a)
if itr.opt.Ascending {
sorted = sort.Reverse(sorted)
}
sort.Stable(sorted)
}
return a, nil
}

View File

@ -587,6 +587,7 @@ func (b *exprIteratorBuilder) callIterator(ctx context.Context, expr *influxql.C
// Identify the name of the field we are using.
arg0 := expr.Args[0].(*influxql.VarRef)
opt.Ordered = false
input, err := buildExprIterator(ctx, arg0, b.ic, []influxql.Source{source}, opt, b.selector, false)
if err != nil {
return err
@ -801,7 +802,7 @@ func buildAuxIterator(ctx context.Context, ic IteratorCreator, sources influxql.
return nil, err
}
// Merge iterators to read auxiliary fields.
// Merge iterators to read auxilary fields.
input, err := Iterators(inputs).Merge(opt)
if err != nil {
Iterators(inputs).Close()

View File

@ -273,6 +273,40 @@ func TestSubquery(t *testing.T) {
{Time: 30 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(8)}},
},
},
{
Name: "TimeOrderingInTheOuterQuery",
Statement: `select * from (select last(value) from cpu group by host) order by time asc`,
Fields: map[string]influxql.DataType{"value": influxql.Float},
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
if got, want := m.Name, "cpu"; got != want {
t.Errorf("unexpected source: got=%s want=%s", got, want)
}
var itr query.Iterator = &FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 7},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 8},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 19 * Second, Value: 7},
}}
if _, ok := opt.Expr.(*influxql.Call); ok {
i, err := query.NewCallIterator(itr, opt)
if err != nil {
panic(err)
}
itr = i
}
return itr
}
},
Rows: []query.Row{
{Time: 19 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{"B", float64(7)}},
{Time: 20 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{"A", float64(3)}},
},
},
{
Name: "TimeZone",
Statement: `SELECT * FROM (SELECT * FROM cpu WHERE time >= '2019-04-17 09:00:00' and time < '2019-04-17 10:00:00' TZ('America/Chicago'))`,
@ -289,6 +323,47 @@ func TestSubquery(t *testing.T) {
}
},
},
{
Name: "DifferentDimensionsOrderByDesc",
Statement: `SELECT value, mytag FROM (SELECT last(value) AS value FROM testing GROUP BY mytag) ORDER BY desc`,
Fields: map[string]influxql.DataType{"value": influxql.Float},
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
if got, want := m.Name, "testing"; got != want {
t.Errorf("unexpected source: got=%s want=%s", got, want)
}
if opt.Ascending {
t.Error("expected iterator to be descending, not ascending")
}
var itr query.Iterator = &FloatIterator{Points: []query.FloatPoint{
{Name: "testing", Tags: ParseTags("mytag=c"), Time: mustParseTime("2019-06-25T22:36:20.93605779Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=c"), Time: mustParseTime("2019-06-25T22:36:20.671604877Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=c"), Time: mustParseTime("2019-06-25T22:36:20.255794481Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=b"), Time: mustParseTime("2019-06-25T22:36:18.176662543Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=b"), Time: mustParseTime("2019-06-25T22:36:17.815979113Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=b"), Time: mustParseTime("2019-06-25T22:36:17.265031598Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=a"), Time: mustParseTime("2019-06-25T22:36:15.144253616Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=a"), Time: mustParseTime("2019-06-25T22:36:14.719167205Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=a"), Time: mustParseTime("2019-06-25T22:36:13.711721316Z").UnixNano(), Value: 2},
}}
if _, ok := opt.Expr.(*influxql.Call); ok {
i, err := query.NewCallIterator(itr, opt)
if err != nil {
panic(err)
}
itr = i
}
return itr
}
},
Rows: []query.Row{
{Time: mustParseTime("2019-06-25T22:36:20.93605779Z").UnixNano(), Series: query.Series{Name: "testing"}, Values: []interface{}{float64(2), "c"}},
{Time: mustParseTime("2019-06-25T22:36:18.176662543Z").UnixNano(), Series: query.Series{Name: "testing"}, Values: []interface{}{float64(2), "b"}},
{Time: mustParseTime("2019-06-25T22:36:15.144253616Z").UnixNano(), Series: query.Series{Name: "testing"}, Values: []interface{}{float64(2), "a"}},
},
},
} {
t.Run(test.Name, func(t *testing.T) {
shardMapper := ShardMapper{