Merge pull request #6391 from influxdata/js-5553-limit-queries-slow-with-group-by

Propagate the limit option to the low level iterators
pull/6403/head
Jonathan A. Sternberg 2016-04-16 09:39:25 -04:00
commit 93745d9693
6 changed files with 269 additions and 7 deletions

View File

@ -4774,7 +4774,7 @@ func TestServer_Query_LimitAndOffset(t *testing.T) {
params: url.Values{"db": []string{"db0"}}, params: url.Values{"db": []string{"db0"}},
}, },
&Query{ &Query{
name: "limit + offset equal to the number of points with group by time", name: "limit + offset equal to the number of points with group by time",
command: `select mean(foo) from "limited" WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY TIME(1s) LIMIT 3 OFFSET 3`, command: `select mean(foo) from "limited" WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY TIME(1s) LIMIT 3 OFFSET 3`,
exp: `{"results":[{"series":[{"name":"limited","columns":["time","mean"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`, exp: `{"results":[{"series":[{"name":"limited","columns":["time","mean"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
params: url.Values{"db": []string{"db0"}}, params: url.Values{"db": []string{"db0"}},
@ -4785,6 +4785,18 @@ func TestServer_Query_LimitAndOffset(t *testing.T) {
exp: `{"results":[{}]}`, exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}}, params: url.Values{"db": []string{"db0"}},
}, },
&Query{
name: "limit - group by tennant",
command: `select foo from "limited" group by tennant limit 1`,
exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:02Z",2]]},{"name":"limited","tags":{"tennant":"todd"},"columns":["time","foo"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "limit and offset - group by tennant",
command: `select foo from "limited" group by tennant limit 1 offset 1`,
exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:03Z",3]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...) }...)
for i, query := range test.queries { for i, query := range test.queries {

View File

@ -792,6 +792,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.
tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset) tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
for _, t := range tagSets { for _, t := range tagSets {
inputs := make([]influxql.Iterator, 0, len(t.SeriesKeys))
for i, seriesKey := range t.SeriesKeys { for i, seriesKey := range t.SeriesKeys {
fields := 0 fields := 0
if t.Filters[i] != nil { if t.Filters[i] != nil {
@ -802,13 +803,25 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.
} }
} }
itr, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt) input, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt)
if err != nil { if err != nil {
return err return err
} else if itr == nil { } else if input == nil {
continue continue
} }
itrs = append(itrs, itr) inputs = append(inputs, input)
}
if len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) {
var itr influxql.Iterator
if opt.MergeSorted() {
itr = influxql.NewSortedMergeIterator(inputs, opt)
} else {
itr = influxql.NewMergeIterator(inputs, opt)
}
itrs = append(itrs, newLimitIterator(itr, opt))
} else {
itrs = append(itrs, inputs...)
} }
} }
} }

View File

@ -508,7 +508,7 @@ func BenchmarkEngine_CreateIterator_Count_1M(b *testing.B) {
} }
func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) { func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) {
benchmarkCallIterator(b, influxql.IteratorOptions{ benchmarkIterator(b, influxql.IteratorOptions{
Expr: influxql.MustParseExpr("count(value)"), Expr: influxql.MustParseExpr("count(value)"),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true, Ascending: true,
@ -517,7 +517,29 @@ func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) {
}, pointN) }, pointN)
} }
func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) { func BenchmarkEngine_CreateIterator_Limit_1K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000)
}
func BenchmarkEngine_CreateIterator_Limit_100K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 100000)
}
func BenchmarkEngine_CreateIterator_Limit_1M(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000000)
}
func benchmarkEngineCreateIteratorLimit(b *testing.B, pointN int) {
benchmarkIterator(b, influxql.IteratorOptions{
Expr: influxql.MustParseExpr("value"),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Dimensions: []string{"host"},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Limit: 10,
}, pointN)
}
func benchmarkIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) {
e := MustInitBenchmarkEngine(pointN) e := MustInitBenchmarkEngine(pointN)
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
@ -536,6 +558,8 @@ var benchmark struct {
PointN int PointN int
} }
var hostNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}
// MustInitBenchmarkEngine creates a new engine and fills it with points. // MustInitBenchmarkEngine creates a new engine and fills it with points.
// Reuses previous engine if the same parameters were used. // Reuses previous engine if the same parameters were used.
func MustInitBenchmarkEngine(pointN int) *Engine { func MustInitBenchmarkEngine(pointN int) *Engine {
@ -567,7 +591,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
for i := 0; i < pointN; i += batchSize { for i := 0; i < pointN; i += batchSize {
var buf bytes.Buffer var buf bytes.Buffer
for j := 0; j < batchSize; j++ { for j := 0; j < batchSize; j++ {
fmt.Fprintf(&buf, "cpu,host=A value=%d %d", fmt.Fprintf(&buf, "cpu,host=%s value=%d %d",
hostNames[j%len(hostNames)],
100+rand.Intn(50)-25, 100+rand.Intn(50)-25,
(time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond), (time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond),
) )

View File

@ -230,6 +230,44 @@ func (itr *floatIterator) Stats() influxql.IteratorStats {
// Close closes the iterator. // Close closes the iterator.
func (itr *floatIterator) Close() error { return nil } func (itr *floatIterator) Close() error { return nil }
// floatLimitIterator
type floatLimitIterator struct {
input influxql.FloatIterator
opt influxql.IteratorOptions
n int
}
func newFloatLimitIterator(input influxql.FloatIterator, opt influxql.IteratorOptions) *floatLimitIterator {
return &floatLimitIterator{
input: input,
opt: opt,
}
}
func (itr *floatLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *floatLimitIterator) Close() error { return itr.input.Close() }
func (itr *floatLimitIterator) Next() *influxql.FloatPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}
// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Increment counter.
itr.n++
// Offsets are handled by a higher level iterator so return all points.
return p
}
}
// floatCursor represents an object for iterating over a single float field. // floatCursor represents an object for iterating over a single float field.
type floatCursor interface { type floatCursor interface {
cursor cursor
@ -604,6 +642,44 @@ func (itr *integerIterator) Stats() influxql.IteratorStats {
// Close closes the iterator. // Close closes the iterator.
func (itr *integerIterator) Close() error { return nil } func (itr *integerIterator) Close() error { return nil }
// integerLimitIterator
type integerLimitIterator struct {
input influxql.IntegerIterator
opt influxql.IteratorOptions
n int
}
func newIntegerLimitIterator(input influxql.IntegerIterator, opt influxql.IteratorOptions) *integerLimitIterator {
return &integerLimitIterator{
input: input,
opt: opt,
}
}
func (itr *integerLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *integerLimitIterator) Close() error { return itr.input.Close() }
func (itr *integerLimitIterator) Next() *influxql.IntegerPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}
// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Increment counter.
itr.n++
// Offsets are handled by a higher level iterator so return all points.
return p
}
}
// integerCursor represents an object for iterating over a single integer field. // integerCursor represents an object for iterating over a single integer field.
type integerCursor interface { type integerCursor interface {
cursor cursor
@ -978,6 +1054,44 @@ func (itr *stringIterator) Stats() influxql.IteratorStats {
// Close closes the iterator. // Close closes the iterator.
func (itr *stringIterator) Close() error { return nil } func (itr *stringIterator) Close() error { return nil }
// stringLimitIterator
type stringLimitIterator struct {
input influxql.StringIterator
opt influxql.IteratorOptions
n int
}
func newStringLimitIterator(input influxql.StringIterator, opt influxql.IteratorOptions) *stringLimitIterator {
return &stringLimitIterator{
input: input,
opt: opt,
}
}
func (itr *stringLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *stringLimitIterator) Close() error { return itr.input.Close() }
func (itr *stringLimitIterator) Next() *influxql.StringPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}
// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Increment counter.
itr.n++
// Offsets are handled by a higher level iterator so return all points.
return p
}
}
// stringCursor represents an object for iterating over a single string field. // stringCursor represents an object for iterating over a single string field.
type stringCursor interface { type stringCursor interface {
cursor cursor
@ -1352,6 +1466,44 @@ func (itr *booleanIterator) Stats() influxql.IteratorStats {
// Close closes the iterator. // Close closes the iterator.
func (itr *booleanIterator) Close() error { return nil } func (itr *booleanIterator) Close() error { return nil }
// booleanLimitIterator
type booleanLimitIterator struct {
input influxql.BooleanIterator
opt influxql.IteratorOptions
n int
}
func newBooleanLimitIterator(input influxql.BooleanIterator, opt influxql.IteratorOptions) *booleanLimitIterator {
return &booleanLimitIterator{
input: input,
opt: opt,
}
}
func (itr *booleanLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *booleanLimitIterator) Close() error { return itr.input.Close() }
func (itr *booleanLimitIterator) Next() *influxql.BooleanPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}
// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Increment counter.
itr.n++
// Offsets are handled by a higher level iterator so return all points.
return p
}
}
// booleanCursor represents an object for iterating over a single boolean field. // booleanCursor represents an object for iterating over a single boolean field.
type booleanCursor interface { type booleanCursor interface {
cursor cursor

View File

@ -226,6 +226,44 @@ func (itr *{{.name}}Iterator) Stats() influxql.IteratorStats {
// Close closes the iterator. // Close closes the iterator.
func (itr *{{.name}}Iterator) Close() error { return nil } func (itr *{{.name}}Iterator) Close() error { return nil }
// {{.name}}LimitIterator
type {{.name}}LimitIterator struct {
input influxql.{{.Name}}Iterator
opt influxql.IteratorOptions
n int
}
func new{{.Name}}LimitIterator(input influxql.{{.Name}}Iterator, opt influxql.IteratorOptions) *{{.name}}LimitIterator {
return &{{.name}}LimitIterator{
input: input,
opt: opt,
}
}
func (itr *{{.name}}LimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() }
func (itr *{{.name}}LimitIterator) Next() *influxql.{{.Name}}Point {
for {
// Check if we are beyond the limit.
if (itr.n-itr.opt.Offset) > itr.opt.Limit {
return nil
}
// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Increment counter.
itr.n++
// Offsets are handled by a higher level iterator so return all points.
return p
}
}
// {{.name}}Cursor represents an object for iterating over a single {{.name}} field. // {{.name}}Cursor represents an object for iterating over a single {{.name}} field.
type {{.name}}Cursor interface { type {{.name}}Cursor interface {
cursor cursor

View File

@ -0,0 +1,22 @@
package tsm1
import (
"fmt"
"github.com/influxdata/influxdb/influxql"
)
func newLimitIterator(input influxql.Iterator, opt influxql.IteratorOptions) influxql.Iterator {
switch input := input.(type) {
case influxql.FloatIterator:
return newFloatLimitIterator(input, opt)
case influxql.IntegerIterator:
return newIntegerLimitIterator(input, opt)
case influxql.StringIterator:
return newStringLimitIterator(input, opt)
case influxql.BooleanIterator:
return newBooleanLimitIterator(input, opt)
default:
panic(fmt.Sprintf("unsupported limit iterator type: %T", input))
}
}