Subquery ordering with aggregates in descending mode was wrong

The sort order of points when performing aggregates never took into
account if they were ascending or descending so when multiple series
were aggregated, it would ensure they were sorted in the correct order.
But it wouldn't reverse this order when descending was used.

Additionally, it seems that the iterator template and the iterator file
itself became out of sync. It seems the template was not reverted
correctly from a previously incorrect change and only the float type was
changed to the correct version and the tests used the float version.
pull/14357/head
Jonathan A. Sternberg 2019-07-17 09:55:38 -05:00
parent be719e49a2
commit c381389f35
No known key found for this signature in database
GPG Key ID: 4A0C1200CB8B9D2E
4 changed files with 624 additions and 288 deletions

File diff suppressed because it is too large Load Diff

View File

@ -450,7 +450,7 @@ func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
if x.Time != y.Time{
return x.Time > y.Time
}
@ -1118,28 +1118,28 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, e
rp.Aggregator.Aggregate{{$k.Name}}(curr)
}
// Reverse sort points by name & tag if our output is supposed to be ordered.
// This is because the final array needs to have the first
// point at the end since they are popped off of the end.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
if len(keys) > 1 && itr.opt.Ordered {
sort.Sort(reverseStringSlice(keys))
// Reverse sort points by name & tag.
// This ensures a consistent order of output.
if len(keys) > 0 {
var sorted sort.Interface = sort.StringSlice(keys)
if itr.opt.Ascending {
sorted = sort.Reverse(sorted)
}
sort.Sort(sorted)
}
// Assume the points are already sorted until proven otherwise.
sortedByTime := true
// Emit the points for each name & tag combination.
a := make([]{{$v.Name}}Point, 0, len(m))
for _, k := range keys {
rp := m[k]
points := rp.Emitter.Emit()
// If the points are meant to be ordered,
// ensure that is done here. The tags are already
// sorted and should remain sorted.
if len(points) > 1 && itr.opt.Ordered {
sort.Stable({{$v.name}}PointsByTime(points))
}
for i := len(points)-1; i >= 0; i-- {
points[i].Name = rp.Name
if !itr.keepTags {
@ -1148,10 +1148,20 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, e
// Set the points time to the interval time if the reducer didn't provide one.
if points[i].Time == ZeroTime {
points[i].Time = startTime
} else {
sortedByTime = false
}
a = append(a, points[i])
}
}
// Points may be out of order. Perform a stable sort by time if requested.
if !sortedByTime && itr.opt.Ordered {
var sorted sort.Interface = {{$v.name}}PointsByTime(a)
if itr.opt.Ascending {
sorted = sort.Reverse(sorted)
}
sort.Stable(sorted)
}
return a, nil
}

View File

@ -587,6 +587,7 @@ func (b *exprIteratorBuilder) callIterator(ctx context.Context, expr *influxql.C
// Identify the name of the field we are using.
arg0 := expr.Args[0].(*influxql.VarRef)
opt.Ordered = false
input, err := buildExprIterator(ctx, arg0, b.ic, []influxql.Source{source}, opt, b.selector, false)
if err != nil {
return err
@ -801,7 +802,7 @@ func buildAuxIterator(ctx context.Context, ic IteratorCreator, sources influxql.
return nil, err
}
// Merge iterators to read auxiliary fields.
// Merge iterators to read auxilary fields.
input, err := Iterators(inputs).Merge(opt)
if err != nil {
Iterators(inputs).Close()

View File

@ -273,6 +273,40 @@ func TestSubquery(t *testing.T) {
{Time: 30 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(8)}},
},
},
{
Name: "TimeOrderingInTheOuterQuery",
Statement: `select * from (select last(value) from cpu group by host) order by time asc`,
Fields: map[string]influxql.DataType{"value": influxql.Float},
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
if got, want := m.Name, "cpu"; got != want {
t.Errorf("unexpected source: got=%s want=%s", got, want)
}
var itr query.Iterator = &FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 7},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 8},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 19 * Second, Value: 7},
}}
if _, ok := opt.Expr.(*influxql.Call); ok {
i, err := query.NewCallIterator(itr, opt)
if err != nil {
panic(err)
}
itr = i
}
return itr
}
},
Rows: []query.Row{
{Time: 19 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{"B", float64(7)}},
{Time: 20 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{"A", float64(3)}},
},
},
{
Name: "TimeZone",
Statement: `SELECT * FROM (SELECT * FROM cpu WHERE time >= '2019-04-17 09:00:00' and time < '2019-04-17 10:00:00' TZ('America/Chicago'))`,
@ -289,6 +323,47 @@ func TestSubquery(t *testing.T) {
}
},
},
{
Name: "DifferentDimensionsOrderByDesc",
Statement: `SELECT value, mytag FROM (SELECT last(value) AS value FROM testing GROUP BY mytag) ORDER BY desc`,
Fields: map[string]influxql.DataType{"value": influxql.Float},
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
if got, want := m.Name, "testing"; got != want {
t.Errorf("unexpected source: got=%s want=%s", got, want)
}
if opt.Ascending {
t.Error("expected iterator to be descending, not ascending")
}
var itr query.Iterator = &FloatIterator{Points: []query.FloatPoint{
{Name: "testing", Tags: ParseTags("mytag=c"), Time: mustParseTime("2019-06-25T22:36:20.93605779Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=c"), Time: mustParseTime("2019-06-25T22:36:20.671604877Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=c"), Time: mustParseTime("2019-06-25T22:36:20.255794481Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=b"), Time: mustParseTime("2019-06-25T22:36:18.176662543Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=b"), Time: mustParseTime("2019-06-25T22:36:17.815979113Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=b"), Time: mustParseTime("2019-06-25T22:36:17.265031598Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=a"), Time: mustParseTime("2019-06-25T22:36:15.144253616Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=a"), Time: mustParseTime("2019-06-25T22:36:14.719167205Z").UnixNano(), Value: 2},
{Name: "testing", Tags: ParseTags("mytag=a"), Time: mustParseTime("2019-06-25T22:36:13.711721316Z").UnixNano(), Value: 2},
}}
if _, ok := opt.Expr.(*influxql.Call); ok {
i, err := query.NewCallIterator(itr, opt)
if err != nil {
panic(err)
}
itr = i
}
return itr
}
},
Rows: []query.Row{
{Time: mustParseTime("2019-06-25T22:36:20.93605779Z").UnixNano(), Series: query.Series{Name: "testing"}, Values: []interface{}{float64(2), "c"}},
{Time: mustParseTime("2019-06-25T22:36:18.176662543Z").UnixNano(), Series: query.Series{Name: "testing"}, Values: []interface{}{float64(2), "b"}},
{Time: mustParseTime("2019-06-25T22:36:15.144253616Z").UnixNano(), Series: query.Series{Name: "testing"}, Values: []interface{}{float64(2), "a"}},
},
},
} {
t.Run(test.Name, func(t *testing.T) {
shardMapper := ShardMapper{