Merge pull request #6533 from benbjohnson/optimize-show-series

Optimize SHOW SERIES
pull/6544/head
Ben Johnson 2016-05-03 09:15:21 -06:00
commit 417df18396
8 changed files with 187 additions and 39 deletions

View File

@ -23,6 +23,7 @@
- [#6290](https://github.com/influxdata/influxdb/issues/6290): Add POST /query endpoint and warning messages for using GET with write operations.
- [#6494](https://github.com/influxdata/influxdb/issues/6494): Support booleans for min() and max().
- [#2074](https://github.com/influxdata/influxdb/issues/2074): Support offset argument in the GROUP BY time(...) call.
- [#6533](https://github.com/influxdata/influxdb/issues/6533): Optimize SHOW SERIES
### Bugfixes

View File

@ -3,6 +3,7 @@ package run_test
import (
"bytes"
"fmt"
"net/url"
"testing"
)
@ -47,3 +48,43 @@ func benchmarkServerQueryCount(b *testing.B, pointN int) {
}
}
}
func BenchmarkServer_ShowSeries_1(b *testing.B) {
benchmarkServerShowSeries(b, 1)
}
func BenchmarkServer_ShowSeries_1K(b *testing.B) {
benchmarkServerShowSeries(b, 1000)
}
func BenchmarkServer_ShowSeries_100K(b *testing.B) {
benchmarkServerShowSeries(b, 100000)
}
func BenchmarkServer_ShowSeries_1M(b *testing.B) {
benchmarkServerShowSeries(b, 1000000)
}
func benchmarkServerShowSeries(b *testing.B, pointN int) {
s := OpenDefaultServer(NewConfig())
defer s.Close()
// Write data into server.
var buf bytes.Buffer
for i := 0; i < pointN; i++ {
fmt.Fprintf(&buf, `cpu,host=server%d value=100 %d`, i, i+1)
if i != pointN-1 {
fmt.Fprint(&buf, "\n")
}
}
s.MustWrite("db0", "rp0", buf.String(), nil)
// Query simple count from server.
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if _, err := s.QueryWithParams(`SHOW SERIES`, url.Values{"db": {"db0"}}); err != nil {
b.Fatal(err)
}
}
}

View File

@ -308,6 +308,7 @@ type floatSortedMergeIterator struct {
opt IteratorOptions
heap floatSortedMergeHeap
init bool
point FloatPoint
}
// newFloatSortedMergeIterator returns an instance of floatSortedMergeIterator.
@ -375,6 +376,8 @@ func (itr *floatSortedMergeIterator) pop() (*FloatPoint, error) {
item := heap.Pop(&itr.heap).(*floatSortedMergeHeapItem)
if item.err != nil {
return nil, item.err
} else if item.point == nil {
return nil, nil
}
// Copy the point for return.
@ -2254,6 +2257,7 @@ type integerSortedMergeIterator struct {
opt IteratorOptions
heap integerSortedMergeHeap
init bool
point IntegerPoint
}
// newIntegerSortedMergeIterator returns an instance of integerSortedMergeIterator.
@ -2321,6 +2325,8 @@ func (itr *integerSortedMergeIterator) pop() (*IntegerPoint, error) {
item := heap.Pop(&itr.heap).(*integerSortedMergeHeapItem)
if item.err != nil {
return nil, item.err
} else if item.point == nil {
return nil, nil
}
// Copy the point for return.
@ -4197,6 +4203,7 @@ type stringSortedMergeIterator struct {
opt IteratorOptions
heap stringSortedMergeHeap
init bool
point StringPoint
}
// newStringSortedMergeIterator returns an instance of stringSortedMergeIterator.
@ -4264,6 +4271,8 @@ func (itr *stringSortedMergeIterator) pop() (*StringPoint, error) {
item := heap.Pop(&itr.heap).(*stringSortedMergeHeapItem)
if item.err != nil {
return nil, item.err
} else if item.point == nil {
return nil, nil
}
// Copy the point for return.
@ -6140,6 +6149,7 @@ type booleanSortedMergeIterator struct {
opt IteratorOptions
heap booleanSortedMergeHeap
init bool
point BooleanPoint
}
// newBooleanSortedMergeIterator returns an instance of booleanSortedMergeIterator.
@ -6207,6 +6217,8 @@ func (itr *booleanSortedMergeIterator) pop() (*BooleanPoint, error) {
item := heap.Pop(&itr.heap).(*booleanSortedMergeHeapItem)
if item.err != nil {
return nil, item.err
} else if item.point == nil {
return nil, nil
}
// Copy the point for return.

View File

@ -307,6 +307,7 @@ type {{$k.name}}SortedMergeIterator struct {
opt IteratorOptions
heap {{$k.name}}SortedMergeHeap
init bool
point {{$k.Name}}Point
}
// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
@ -374,6 +375,8 @@ func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
item := heap.Pop(&itr.heap).(*{{$k.name}}SortedMergeHeapItem)
if item.err != nil {
return nil, item.err
} else if item.point == nil {
return nil, nil
}
// Copy the point for return.

View File

@ -1174,6 +1174,55 @@ func decodeIteratorStats(pb *internal.IteratorStats) IteratorStats {
}
}
// floatFastDedupeIterator outputs unique points where the point has a single aux field.
type floatFastDedupeIterator struct {
input FloatIterator
m map[fastDedupeKey]struct{} // lookup of points already sent
}
// newFloatFastDedupeIterator returns a new instance of floatFastDedupeIterator.
func newFloatFastDedupeIterator(input FloatIterator) *floatFastDedupeIterator {
return &floatFastDedupeIterator{
input: input,
m: make(map[fastDedupeKey]struct{}),
}
}
// Stats returns stats from the input iterator.
func (itr *floatFastDedupeIterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the iterator and all child iterators.
func (itr *floatFastDedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *floatFastDedupeIterator) Next() (*FloatPoint, error) {
for {
// Read next point.
// Skip if there are not any aux fields.
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
} else if len(p.Aux) == 0 {
continue
}
// If the point has already been output then move to the next point.
key := fastDedupeKey{p.Name, p.Aux[0]}
if _, ok := itr.m[key]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[key] = struct{}{}
return p, nil
}
}
type fastDedupeKey struct {
name string
value interface{}
}
type reverseStringSlice []string
func (p reverseStringSlice) Len() int { return len(p) }

View File

@ -104,7 +104,12 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
// Filter out duplicate rows, if required.
if opt.Dedupe {
input = NewDedupeIterator(input)
// If there is no group by and it's a single field then fast dedupe.
if itr, ok := input.(FloatIterator); ok && len(fields) == 1 && len(opt.Dimensions) == 0 {
input = newFloatFastDedupeIterator(itr)
} else {
input = NewDedupeIterator(input)
}
}
// Apply limit & offset.

View File

@ -412,6 +412,7 @@ func (d *DatabaseIndex) Measurements() Measurements {
measurements = append(measurements, m)
}
d.mu.RUnlock()
return measurements
}
@ -521,6 +522,16 @@ func (m *Measurement) SeriesByID(id uint64) *Series {
return m.seriesByID[id]
}
// AppendSeriesKeysByID appends keys for a list of series ids to a buffer.
func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string {
m.mu.RLock()
defer m.mu.RUnlock()
for _, id := range ids {
dst = append(dst, m.seriesByID[id].Key)
}
return dst
}
// SeriesKeys returns the keys of every series in this measurement
func (m *Measurement) SeriesKeys() []string {
m.mu.RLock()

View File

@ -983,16 +983,18 @@ func (itr *MeasurementIterator) Next() (*influxql.FloatPoint, error) {
// seriesIterator emits series ids.
type seriesIterator struct {
keys []string // remaining series
fields []string // fields to emit (key)
mms Measurements
keys struct {
buf []string
i int
}
point influxql.FloatPoint // reusable point
opt influxql.IteratorOptions
}
// NewSeriesIterator returns a new instance of SeriesIterator.
func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Retrieve a list of all measurements.
mms := sh.index.Measurements()
sort.Sort(mms)
// Only equality operators are allowed.
var err error
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
@ -1010,22 +1012,16 @@ func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterat
return nil, err
}
// Generate a list of all series keys.
keys := newStringSet()
for _, mm := range mms {
ids, err := mm.seriesIDsAllOrByExpr(opt.Condition)
if err != nil {
return nil, err
}
for _, id := range ids {
keys.add(mm.SeriesByID(id).Key)
}
}
// Read and sort all measurements.
mms := sh.index.Measurements()
sort.Sort(mms)
return &seriesIterator{
keys: keys.list(),
fields: opt.Aux,
mms: mms,
point: influxql.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}, nil
}
@ -1037,27 +1033,57 @@ func (itr *seriesIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *seriesIterator) Next() (*influxql.FloatPoint, error) {
// If there are no more keys then return nil.
if len(itr.keys) == 0 {
return nil, nil
}
// Prepare auxiliary fields.
aux := make([]interface{}, len(itr.fields))
for i, f := range itr.fields {
switch f {
case "key":
aux[i] = itr.keys[0]
for {
// Load next measurement's keys if there are no more remaining.
if itr.keys.i >= len(itr.keys.buf) {
if err := itr.nextKeys(); err != nil {
return nil, err
}
if len(itr.keys.buf) == 0 {
return nil, nil
}
}
}
// Return next key.
p := &influxql.FloatPoint{
Aux: aux,
}
itr.keys = itr.keys[1:]
// Read the next key.
key := itr.keys.buf[itr.keys.i]
itr.keys.i++
return p, nil
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f {
case "key":
itr.point.Aux[i] = key
}
}
return &itr.point, nil
}
}
// nextKeys reads all keys for the next measurement.
func (itr *seriesIterator) nextKeys() error {
for {
// Ensure previous keys are cleared out.
itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0]
// Read next measurement.
if len(itr.mms) == 0 {
return nil
}
mm := itr.mms[0]
itr.mms = itr.mms[1:]
// Read all series keys.
ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition)
if err != nil {
return err
} else if len(ids) == 0 {
continue
}
itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids)
sort.Strings(itr.keys.buf)
return nil
}
}
// NewTagKeysIterator returns a new instance of TagKeysIterator.