From addc12561fc7eb5f09dc83a8a697349e40f2b930 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Wed, 5 Apr 2017 11:47:57 -0500 Subject: [PATCH] Fix LIMIT and OFFSET for certain aggregate queries When LIMIT and OFFSET were used with any functions that were not handled directly by the query engine (anything other than count, max, min, mean, first, or last), the input to the function would be limited instead of receiving the full stream of values it was supposed to receive. This also fixes a bug that caused the server to hang when LIMIT and OFFSET were used with a selector. When using a selector, the limit and offset should be handled before the points go to the auxiliary iterator to be split into different iterators. Limiting happened afterwards which caused the auxiliary iterator to hang forever. --- CHANGELOG.md | 1 + influxql/select.go | 67 ++++++++++++++++++++------------------------ tests/server_test.go | 58 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d10cf0b3db..17e031d073 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8065](https://github.com/influxdata/influxdb/issues/8065): Restrict top() and bottom() selectors to be used with no other functions. - [#8266](https://github.com/influxdata/influxdb/issues/8266): top() and bottom() now returns the time for every point. - [#8315](https://github.com/influxdata/influxdb/issues/8315): Remove default upper time bound on DELETE queries. +- [#8066](https://github.com/influxdata/influxdb/issues/8066): Fix LIMIT and OFFSET for certain aggregate queries. ## v1.2.3 [unreleased] diff --git a/influxql/select.go b/influxql/select.go index fb892b07a2..f0a1adf658 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -356,6 +356,11 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, sources Sources, opt } else if itr == nil { itr = &nilFloatIterator{} } + + // If there is a limit or offset then apply it. + if opt.Limit > 0 || opt.Offset > 0 { + itr = NewLimitIterator(itr, opt) + } itrs[i] = itr input = itr } @@ -390,13 +395,6 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, sources Sources, opt return nil, err } - // If there is a limit or offset then apply it. - if opt.Limit > 0 || opt.Offset > 0 { - for i := range itrs { - itrs[i] = NewLimitIterator(itrs[i], opt) - } - } - return itrs, nil } @@ -706,9 +704,11 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { // TODO(jsternberg): Refactor this. This section needs to die in a fire. + opt := b.opt + // Eliminate limits and offsets if they were previously set. These are handled by the caller. + opt.Limit, opt.Offset = 0, 0 switch expr.Name { case "distinct": - opt := b.opt opt.Ordered = true input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, b.selector) if err != nil { @@ -720,7 +720,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } return NewIntervalIterator(input, opt), nil case "sample": - opt := b.opt opt.Ordered = true input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector) if err != nil { @@ -730,7 +729,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { return newSampleIterator(input, opt, int(size.Val)) case "holt_winters", "holt_winters_with_fit": - opt := b.opt opt.Ordered = true input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector) if err != nil { @@ -741,7 +739,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { includeFitData := "holt_winters_with_fit" == expr.Name - interval := b.opt.Interval.Duration + interval := opt.Interval.Duration // Redefine interval to be unbounded to capture all aggregate results opt.StartTime = MinTime opt.EndTime = MaxTime @@ -749,7 +747,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeFitData, interval) case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "elapsed": - opt := b.opt if !opt.Interval.IsZero() { if opt.Ascending { opt.StartTime -= int64(opt.Interval.Duration) @@ -777,7 +774,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { return newDifferenceIterator(input, opt, isNonNegative) case "moving_average": n := expr.Args[1].(*IntegerLiteral) - if n.Val > 1 && !b.opt.Interval.IsZero() { + if n.Val > 1 && !opt.Interval.IsZero() { if opt.Ascending { opt.StartTime -= int64(opt.Interval.Duration) * (n.Val - 1) } else { @@ -788,7 +785,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name)) case "cumulative_sum": - opt := b.opt opt.Ordered = true input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector) if err != nil { @@ -796,7 +792,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } return newCumulativeSumIterator(input, opt) case "integral": - opt := b.opt opt.Ordered = true input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { @@ -864,11 +859,11 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { switch arg0 := expr.Args[0].(type) { case *Call: if arg0.Name == "distinct" { - input, err := buildExprIterator(arg0, b.ic, b.sources, b.opt, b.selector) + input, err := buildExprIterator(arg0, b.ic, b.sources, opt, b.selector) if err != nil { return nil, err } - return newCountIterator(input, b.opt) + return newCountIterator(input, opt) } } fallthrough @@ -878,7 +873,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { for _, source := range b.sources { switch source := source.(type) { case *Measurement: - input, err := b.ic.CreateIterator(source, b.opt) + input, err := b.ic.CreateIterator(source, opt) if err != nil { return err } @@ -887,17 +882,17 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { // Identify the name of the field we are using. arg0 := expr.Args[0].(*VarRef) - input, err := buildExprIterator(arg0, b.ic, []Source{source}, b.opt, b.selector) + input, err := buildExprIterator(arg0, b.ic, []Source{source}, opt, b.selector) if err != nil { return err } - if b.opt.Condition != nil { - input = NewFilterIterator(input, b.opt.Condition, b.opt) + if opt.Condition != nil { + input = NewFilterIterator(input, opt.Condition, opt) } // Wrap the result in a call iterator. - i, err := NewCallIterator(input, b.opt) + i, err := NewCallIterator(input, opt) if err != nil { input.Close() return err @@ -911,7 +906,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { return nil, err } - itr, err := Iterators(inputs).Merge(b.opt) + itr, err := Iterators(inputs).Merge(opt) if err != nil { Iterators(inputs).Close() return nil, err @@ -920,7 +915,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } return itr, nil case "median": - opt := b.opt opt.Ordered = true input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { @@ -928,26 +922,25 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } return newMedianIterator(input, opt) case "mode": - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false) + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { return nil, err } - return NewModeIterator(input, b.opt) + return NewModeIterator(input, opt) case "stddev": - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false) + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { return nil, err } - return newStddevIterator(input, b.opt) + return newStddevIterator(input, opt) case "spread": // OPTIMIZE(benbjohnson): convert to map/reduce - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false) + input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { return nil, err } - return newSpreadIterator(input, b.opt) + return newSpreadIterator(input, opt) case "percentile": - opt := b.opt opt.Ordered = true input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) if err != nil { @@ -970,14 +963,14 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { return nil, err } - if !b.selector || !b.opt.Interval.IsZero() { - itr = NewIntervalIterator(itr, b.opt) - if !b.opt.Interval.IsZero() && b.opt.Fill != NoFill { - itr = NewFillIterator(itr, expr, b.opt) + if !b.selector || !opt.Interval.IsZero() { + itr = NewIntervalIterator(itr, opt) + if !opt.Interval.IsZero() && opt.Fill != NoFill { + itr = NewFillIterator(itr, expr, opt) } } - if b.opt.InterruptCh != nil { - itr = NewInterruptIterator(itr, b.opt.InterruptCh) + if opt.InterruptCh != nil { + itr = NewInterruptIterator(itr, opt.InterruptCh) } return itr, nil } diff --git a/tests/server_test.go b/tests/server_test.go index 75db4488a4..d4cc8002ff 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -7934,6 +7934,64 @@ func TestServer_Query_Sample_Wildcard(t *testing.T) { } } +func TestServer_Query_Sample_LimitOffset(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig()) + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil { + t.Fatal(err) + } + + writes := []string{ + fmt.Sprintf(`cpu float=1,int=1i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`cpu float=2,int=2i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:01:00Z").UnixNano()), + fmt.Sprintf(`cpu float=3,int=3i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:02:00Z").UnixNano()), + } + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: strings.Join(writes, "\n")}, + } + + test.addQueries([]*Query{ + &Query{ + name: "sample() with limit 1", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT sample(float, 3), int FROM cpu LIMIT 1`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sample","int"],"values":[["2000-01-01T00:00:00Z",1,1]]}]}]}`, + }, + &Query{ + name: "sample() with offset 1", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT sample(float, 3), int FROM cpu OFFSET 1`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sample","int"],"values":[["2000-01-01T00:01:00Z",2,2],["2000-01-01T00:02:00Z",3,3]]}]}]}`, + }, + &Query{ + name: "sample() with limit 1 offset 1", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT sample(float, 3), int FROM cpu LIMIT 1 OFFSET 1`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sample","int"],"values":[["2000-01-01T00:01:00Z",2,2]]}]}]}`, + }, + }...) + + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + + for _, query := range test.queries { + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + // Validate that nested aggregates don't panic func TestServer_NestedAggregateWithMathPanics(t *testing.T) { t.Parallel()