From b1caafe82f67fcf0d254e145b0ac21d77490f5e3 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 27 Mar 2017 16:34:53 -0500 Subject: [PATCH] Ensure the input for certain functions in the query engine are ordered The following functions require ordered input but were not guaranteed to received ordered input: * `distinct()` * `sample()` * `holt_winters()` * `holt_winters_with_fit()` * `derivative()` * `non_negative_derivative()` * `difference()` * `moving_average()` * `elapsed()` * `cumulative_sum()` * `top()` * `bottom()` These function calls have now been modified to request that their input be ordered by the query engine. This will prevent the improper output that could have been caused by multiple series being merged together or multiple shards being merged together potentially incorrectly when no time grouping was specified. Two additional functions were already correct to begin with (so there are no bugs with these two, but I'm including their names for completeness). * `median()` * `percentile()` --- CHANGELOG.md | 1 + cmd/influxd/run/server_test.go | 67 ++++++++++++++++++++++++++++++++++ influxql/select.go | 36 ++++++++++++------ 3 files changed, 92 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d22a253b3f..3a669d0c20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#8022](https://github.com/influxdata/influxdb/issues/8022): Segment violation in models.Tags.Get - [#8155](https://github.com/influxdata/influxdb/pull/8155): Simplify admin user check. - [#8167](https://github.com/influxdata/influxdb/issues/8167): Fix a regression when math was used with selectors. +- [#8175](https://github.com/influxdata/influxdb/issues/8175): Ensure the input for certain functions in the query engine are ordered. ## v1.2.2 [2017-03-14] diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 28c93466a1..7653f70290 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -5402,6 +5402,73 @@ func TestServer_Query_AcrossShardsAndFields(t *testing.T) { } } +func TestServer_Query_OrderedAcrossShards(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig()) + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil { + t.Fatal(err) + } + + writes := []string{ + fmt.Sprintf(`cpu value=7 %d`, mustParseTime(time.RFC3339Nano, "2010-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`cpu value=14 %d`, mustParseTime(time.RFC3339Nano, "2010-01-08T00:00:00Z").UnixNano()), + fmt.Sprintf(`cpu value=28 %d`, mustParseTime(time.RFC3339Nano, "2010-01-15T00:00:00Z").UnixNano()), + fmt.Sprintf(`cpu value=56 %d`, mustParseTime(time.RFC3339Nano, "2010-01-22T00:00:00Z").UnixNano()), + fmt.Sprintf(`cpu value=112 %d`, mustParseTime(time.RFC3339Nano, "2010-01-29T00:00:00Z").UnixNano()), + } + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: strings.Join(writes, "\n")}, + } + + test.addQueries([]*Query{ + &Query{ + name: "derivative", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT derivative(value, 24h) FROM cpu`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-01-08T00:00:00Z",1],["2010-01-15T00:00:00Z",2],["2010-01-22T00:00:00Z",4],["2010-01-29T00:00:00Z",8]]}]}]}`, + }, + &Query{ + name: "non_negative_derivative", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT non_negative_derivative(value, 24h) FROM cpu`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","non_negative_derivative"],"values":[["2010-01-08T00:00:00Z",1],["2010-01-15T00:00:00Z",2],["2010-01-22T00:00:00Z",4],["2010-01-29T00:00:00Z",8]]}]}]}`, + }, + &Query{ + name: "difference", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT difference(value) FROM cpu`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-01-08T00:00:00Z",7],["2010-01-15T00:00:00Z",14],["2010-01-22T00:00:00Z",28],["2010-01-29T00:00:00Z",56]]}]}]}`, + }, + &Query{ + name: "cumulative_sum", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT cumulative_sum(value) FROM cpu`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-01-01T00:00:00Z",7],["2010-01-08T00:00:00Z",21],["2010-01-15T00:00:00Z",49],["2010-01-22T00:00:00Z",105],["2010-01-29T00:00:00Z",217]]}]}]}`, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + func TestServer_Query_Where_Fields(t *testing.T) { t.Parallel() s := OpenServer(NewConfig()) diff --git a/influxql/select.go b/influxql/select.go index c76ca44f91..6d6ac31ec7 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -707,25 +707,31 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { // TODO(jsternberg): Refactor this. This section needs to die in a fire. switch expr.Name { case "distinct": - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, b.selector) + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, b.selector) if err != nil { return nil, err } - input, err = NewDistinctIterator(input, b.opt) + input, err = NewDistinctIterator(input, opt) if err != nil { return nil, err } - return NewIntervalIterator(input, b.opt), nil + return NewIntervalIterator(input, opt), nil case "sample": - input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector) + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector) if err != nil { return nil, err } size := expr.Args[1].(*IntegerLiteral) - return newSampleIterator(input, b.opt, int(size.Val)) + return newSampleIterator(input, opt, int(size.Val)) case "holt_winters", "holt_winters_with_fit": - input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector) + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector) if err != nil { return nil, err } @@ -736,7 +742,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { interval := b.opt.Interval.Duration // Redefine interval to be unbounded to capture all aggregate results - opt := b.opt opt.StartTime = MinTime opt.EndTime = MaxTime opt.Interval = Interval{} @@ -751,6 +756,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { opt.EndTime += int64(opt.Interval.Duration) } } + opt.Ordered = true input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector) if err != nil { @@ -780,11 +786,13 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name)) case "cumulative_sum": - input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, b.opt, b.selector) + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector) if err != nil { return nil, err } - return newCumulativeSumIterator(input, b.opt) + return newCumulativeSumIterator(input, opt) } itr, err := func() (Iterator, error) { @@ -893,12 +901,14 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } } - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false) + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { return nil, err } n := expr.Args[len(expr.Args)-1].(*IntegerLiteral) - return newTopIterator(input, b.opt, n, tags) + return newTopIterator(input, opt, n, tags) case "bottom": var tags []int if len(expr.Args) < 2 { @@ -917,7 +927,9 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } } - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false) + opt := b.opt + opt.Ordered = true + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { return nil, err }