diff --git a/CHANGELOG.md b/CHANGELOG.md index bcb4ffa6da..c08842baf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ - [#6557](https://github.com/influxdata/influxdb/issues/6557): Overwriting points on large series can cause memory spikes during compactions - [#6611](https://github.com/influxdata/influxdb/issues/6611): Queries slow down hundreds times after overwriting points - [#6641](https://github.com/influxdata/influxdb/issues/6641): Fix read tombstones: EOF +- [#6661](https://github.com/influxdata/influxdb/issues/6661): Disable limit optimization when using an aggregate. ## v0.13.0 [2016-05-12] diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 3b052a3592..f80d288c14 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -846,7 +846,7 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator if call, ok := opt.Expr.(*influxql.Call); ok { refOpt := opt refOpt.Expr = call.Args[0].(*influxql.VarRef) - inputs, err := e.createVarRefIterator(refOpt) + inputs, err := e.createVarRefIterator(refOpt, true) if err != nil { return nil, err } else if len(inputs) == 0 { @@ -869,7 +869,7 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator return influxql.NewParallelMergeIterator(inputs, opt, runtime.GOMAXPROCS(0)), nil } - itrs, err := e.createVarRefIterator(opt) + itrs, err := e.createVarRefIterator(opt, false) if err != nil { return nil, err } @@ -882,7 +882,9 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator } // createVarRefIterator creates an iterator for a variable reference. -func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.Iterator, error) { +// The aggregate argument determines this is being created for an aggregate. +// If this is an aggregate, the limit optimization is disabled temporarily. See #6661. +func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bool) ([]influxql.Iterator, error) { ref, _ := opt.Expr.(*influxql.VarRef) var itrs []influxql.Iterator @@ -905,14 +907,8 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql. return err } - if len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) { - var itr influxql.Iterator - if opt.MergeSorted() { - itr = influxql.NewSortedMergeIterator(inputs, opt) - } else { - itr = influxql.NewMergeIterator(inputs, opt) - } - itrs = append(itrs, newLimitIterator(itr, opt)) + if !aggregate && len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) { + itrs = append(itrs, newLimitIterator(influxql.NewSortedMergeIterator(inputs, opt), opt)) } else { itrs = append(itrs, inputs...) }