Fix LIMIT and OFFSET for certain aggregate queries

When LIMIT and OFFSET were used with any functions that were not handled
directly by the query engine (anything other than count, max, min, mean,
first, or last), the input to the function would be limited instead of
receiving the full stream of values it was supposed to receive.

This also fixes a bug that caused the server to hang when LIMIT and
OFFSET were used with a selector. When using a selector, the limit and
offset should be handled before the points go to the auxiliary iterator
to be split into different iterators. Limiting happened afterwards which
caused the auxiliary iterator to hang forever.
pull/8261/head
Jonathan A. Sternberg 2017-04-05 11:47:57 -05:00
parent ca85dd4cff
commit addc12561f
3 changed files with 89 additions and 37 deletions

View File

@ -52,6 +52,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8065](https://github.com/influxdata/influxdb/issues/8065): Restrict top() and bottom() selectors to be used with no other functions.
- [#8266](https://github.com/influxdata/influxdb/issues/8266): top() and bottom() now returns the time for every point.
- [#8315](https://github.com/influxdata/influxdb/issues/8315): Remove default upper time bound on DELETE queries.
- [#8066](https://github.com/influxdata/influxdb/issues/8066): Fix LIMIT and OFFSET for certain aggregate queries.
## v1.2.3 [unreleased]

View File

@ -356,6 +356,11 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, sources Sources, opt
} else if itr == nil {
itr = &nilFloatIterator{}
}
// If there is a limit or offset then apply it.
if opt.Limit > 0 || opt.Offset > 0 {
itr = NewLimitIterator(itr, opt)
}
itrs[i] = itr
input = itr
}
@ -390,13 +395,6 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, sources Sources, opt
return nil, err
}
// If there is a limit or offset then apply it.
if opt.Limit > 0 || opt.Offset > 0 {
for i := range itrs {
itrs[i] = NewLimitIterator(itrs[i], opt)
}
}
return itrs, nil
}
@ -706,9 +704,11 @@ func (b *exprIteratorBuilder) buildVarRefIterator(expr *VarRef) (Iterator, error
func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
// TODO(jsternberg): Refactor this. This section needs to die in a fire.
opt := b.opt
// Eliminate limits and offsets if they were previously set. These are handled by the caller.
opt.Limit, opt.Offset = 0, 0
switch expr.Name {
case "distinct":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, b.selector)
if err != nil {
@ -720,7 +720,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
return NewIntervalIterator(input, opt), nil
case "sample":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
@ -730,7 +729,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
return newSampleIterator(input, opt, int(size.Val))
case "holt_winters", "holt_winters_with_fit":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
@ -741,7 +739,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
includeFitData := "holt_winters_with_fit" == expr.Name
interval := b.opt.Interval.Duration
interval := opt.Interval.Duration
// Redefine interval to be unbounded to capture all aggregate results
opt.StartTime = MinTime
opt.EndTime = MaxTime
@ -749,7 +747,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeFitData, interval)
case "derivative", "non_negative_derivative", "difference", "non_negative_difference", "moving_average", "elapsed":
opt := b.opt
if !opt.Interval.IsZero() {
if opt.Ascending {
opt.StartTime -= int64(opt.Interval.Duration)
@ -777,7 +774,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
return newDifferenceIterator(input, opt, isNonNegative)
case "moving_average":
n := expr.Args[1].(*IntegerLiteral)
if n.Val > 1 && !b.opt.Interval.IsZero() {
if n.Val > 1 && !opt.Interval.IsZero() {
if opt.Ascending {
opt.StartTime -= int64(opt.Interval.Duration) * (n.Val - 1)
} else {
@ -788,7 +785,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name))
case "cumulative_sum":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0], b.ic, b.sources, opt, b.selector)
if err != nil {
@ -796,7 +792,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
return newCumulativeSumIterator(input, opt)
case "integral":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
@ -864,11 +859,11 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
switch arg0 := expr.Args[0].(type) {
case *Call:
if arg0.Name == "distinct" {
input, err := buildExprIterator(arg0, b.ic, b.sources, b.opt, b.selector)
input, err := buildExprIterator(arg0, b.ic, b.sources, opt, b.selector)
if err != nil {
return nil, err
}
return newCountIterator(input, b.opt)
return newCountIterator(input, opt)
}
}
fallthrough
@ -878,7 +873,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
for _, source := range b.sources {
switch source := source.(type) {
case *Measurement:
input, err := b.ic.CreateIterator(source, b.opt)
input, err := b.ic.CreateIterator(source, opt)
if err != nil {
return err
}
@ -887,17 +882,17 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
// Identify the name of the field we are using.
arg0 := expr.Args[0].(*VarRef)
input, err := buildExprIterator(arg0, b.ic, []Source{source}, b.opt, b.selector)
input, err := buildExprIterator(arg0, b.ic, []Source{source}, opt, b.selector)
if err != nil {
return err
}
if b.opt.Condition != nil {
input = NewFilterIterator(input, b.opt.Condition, b.opt)
if opt.Condition != nil {
input = NewFilterIterator(input, opt.Condition, opt)
}
// Wrap the result in a call iterator.
i, err := NewCallIterator(input, b.opt)
i, err := NewCallIterator(input, opt)
if err != nil {
input.Close()
return err
@ -911,7 +906,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
return nil, err
}
itr, err := Iterators(inputs).Merge(b.opt)
itr, err := Iterators(inputs).Merge(opt)
if err != nil {
Iterators(inputs).Close()
return nil, err
@ -920,7 +915,6 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
return itr, nil
case "median":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
@ -928,26 +922,25 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
}
return newMedianIterator(input, opt)
case "mode":
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false)
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}
return NewModeIterator(input, b.opt)
return NewModeIterator(input, opt)
case "stddev":
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false)
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}
return newStddevIterator(input, b.opt)
return newStddevIterator(input, opt)
case "spread":
// OPTIMIZE(benbjohnson): convert to map/reduce
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, b.opt, false)
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
return nil, err
}
return newSpreadIterator(input, b.opt)
return newSpreadIterator(input, opt)
case "percentile":
opt := b.opt
opt.Ordered = true
input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false)
if err != nil {
@ -970,14 +963,14 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) {
return nil, err
}
if !b.selector || !b.opt.Interval.IsZero() {
itr = NewIntervalIterator(itr, b.opt)
if !b.opt.Interval.IsZero() && b.opt.Fill != NoFill {
itr = NewFillIterator(itr, expr, b.opt)
if !b.selector || !opt.Interval.IsZero() {
itr = NewIntervalIterator(itr, opt)
if !opt.Interval.IsZero() && opt.Fill != NoFill {
itr = NewFillIterator(itr, expr, opt)
}
}
if b.opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, b.opt.InterruptCh)
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}

View File

@ -7934,6 +7934,64 @@ func TestServer_Query_Sample_Wildcard(t *testing.T) {
}
}
func TestServer_Query_Sample_LimitOffset(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf(`cpu float=1,int=1i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu float=2,int=2i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:01:00Z").UnixNano()),
fmt.Sprintf(`cpu float=3,int=3i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:02:00Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
test.addQueries([]*Query{
&Query{
name: "sample() with limit 1",
params: url.Values{"db": []string{"db0"}},
command: `SELECT sample(float, 3), int FROM cpu LIMIT 1`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sample","int"],"values":[["2000-01-01T00:00:00Z",1,1]]}]}]}`,
},
&Query{
name: "sample() with offset 1",
params: url.Values{"db": []string{"db0"}},
command: `SELECT sample(float, 3), int FROM cpu OFFSET 1`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sample","int"],"values":[["2000-01-01T00:01:00Z",2,2],["2000-01-01T00:02:00Z",3,3]]}]}]}`,
},
&Query{
name: "sample() with limit 1 offset 1",
params: url.Values{"db": []string{"db0"}},
command: `SELECT sample(float, 3), int FROM cpu LIMIT 1 OFFSET 1`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sample","int"],"values":[["2000-01-01T00:01:00Z",2,2]]}]}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Validate that nested aggregates don't panic
func TestServer_NestedAggregateWithMathPanics(t *testing.T) {
t.Parallel()