From 1b94cd26862608e9f080aa2c6236d29a0eec7468 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 6 Jun 2016 13:53:54 -0600 Subject: [PATCH] optimize SHOW TAG VALUES MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit optimizes `SHOW TAG VALUES` so that it avoids the `SELECT` query engine execution and iterator creation. There are also optimizations to reduce individual memory allocations and to reduce in-memory heap size by only operating on one measurement at a time. Execution time has been reduce to approximately 900ms for 500,000 rows. This is about 2µs per row. Of this time, approximately 1µs is spent retrieving and sorting the row and 1µs is spent encoding into JSON and writing to the response body. --- coordinator/statement_executor.go | 148 +++++++++++++++++++++++++ coordinator/statement_executor_test.go | 6 + tsdb/meta.go | 35 +++++- tsdb/shard.go | 131 ---------------------- 4 files changed, 183 insertions(+), 137 deletions(-) diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 820215d826..f7b7b166cc 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -7,6 +7,7 @@ import ( "io" "sort" "strconv" + "strings" "time" "github.com/influxdata/influxdb" @@ -403,6 +404,12 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw } func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error { + // Handle SHOW TAG VALUES separately so it can be optimized. + // https://github.com/influxdata/influxdb/issues/6233 + if source, ok := stmt.Sources[0].(*influxql.Measurement); ok && source.Name == "_tags" { + return e.executeShowTagValues(stmt, ctx) + } + // It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now` now := time.Now().UTC() opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh} @@ -597,6 +604,130 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt return e.TSDBStore.IteratorCreator(shards) } +func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error { + if stmt.Condition == nil { + return errors.New("a condition is required") + } + + source := stmt.Sources[0].(*influxql.Measurement) + index := e.TSDBStore.DatabaseIndex(source.Database) + if index == nil { + ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)} + return nil + } + + measurementExpr := influxql.CloneExpr(stmt.Condition) + measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || tag.Val != "_name" { + return nil + } + } + } + return e + }), nil) + + mms, ok, err := index.MeasurementsByExpr(measurementExpr) + if err != nil { + return err + } else if !ok { + mms = index.Measurements() + sort.Sort(mms) + } + + // If there are no measurements, return immediately. + if len(mms) == 0 { + ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)} + return nil + } + + filterExpr := influxql.CloneExpr(stmt.Condition) + filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || strings.HasPrefix(tag.Val, "_") { + return nil + } + } + } + return e + }), nil) + + var emitted bool + columns := stmt.ColumnNames() + for _, mm := range mms { + ids, err := mm.SeriesIDsAllOrByExpr(filterExpr) + if err != nil { + return err + } + ss := mm.SeriesByIDSlice(ids) + + // Determine a list of keys from condition. + keySet, ok, err := mm.TagKeysByExpr(stmt.Condition) + if err != nil { + return err + } + + // Loop over all keys for each series. + m := make(map[keyValue]struct{}, len(ss)) + for _, series := range ss { + for key, value := range series.Tags { + if !ok { + // nop + } else if _, exists := keySet[key]; !exists { + continue + } + m[keyValue{key, value}] = struct{}{} + } + } + + // Move to next series if no key/values match. + if len(m) == 0 { + continue + } + + // Sort key/value set. + a := make([]keyValue, 0, len(m)) + for kv := range m { + a = append(a, kv) + } + sort.Sort(keyValues(a)) + + // Convert to result values. + slab := make([]interface{}, len(a)*2) + values := make([][]interface{}, len(a)) + for i, elem := range a { + slab[i*2], slab[i*2+1] = elem.key, elem.value + values[i] = slab[i*2 : i*2+2] + } + + // Send result to client. + ctx.Results <- &influxql.Result{ + StatementID: ctx.StatementID, + Series: []*models.Row{&models.Row{ + Name: mm.Name, + Columns: columns, + Values: values, + }}, + } + emitted = true + } + + // Always emit at least one row. + if !emitted { + ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)} + } + + return nil +} + func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) { dis := e.MetaClient.Databases() @@ -1008,6 +1139,7 @@ type TSDBStore interface { DeleteRetentionPolicy(database, name string) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error + DatabaseIndex(name string) *tsdb.DatabaseIndex IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) ShardIteratorCreator(id uint64) influxql.IteratorCreator } @@ -1105,3 +1237,19 @@ type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } + +type keyValue struct { + key, value string +} + +type keyValues []keyValue + +func (a keyValues) Len() int { return len(a) } +func (a keyValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a keyValues) Less(i, j int) bool { + ki, kj := a[i].key, a[j].key + if ki == kj { + return a[i].value < a[j].value + } + return ki < kj +} diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 9917ffaa26..59b7091dd3 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" + "github.com/influxdata/influxdb/tsdb" ) const ( @@ -200,6 +201,7 @@ type TSDBStore struct { DeleteRetentionPolicyFn func(database, name string) error DeleteShardFn func(id uint64) error DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error + DatabaseIndexFn func(name string) *tsdb.DatabaseIndex ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator } @@ -267,6 +269,10 @@ func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator { return s.ShardIteratorCreatorFn(id) } +func (s *TSDBStore) DatabaseIndex(name string) *tsdb.DatabaseIndex { + return s.DatabaseIndexFn(name) +} + // MustParseQuery parses s into a query. Panic on error. func MustParseQuery(s string) *influxql.Query { q, err := influxql.ParseQuery(s) diff --git a/tsdb/meta.go b/tsdb/meta.go index 1ff1e90830..c801c8e063 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -239,12 +239,18 @@ func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { return ss.Tags } -// measurementsByExpr takes an expression containing only tags and returns a +// MeasurementsByExpr takes an expression containing only tags and returns a // list of matching *Measurement. The bool return argument returns if the // expression was a measurement expression. It is used to differentiate a list // of no measurements because all measurements were filtered out (when the bool // is true) against when there are no measurements because the expression // wasn't evaluated (when the bool is false). +func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { + d.mu.RLock() + defer d.mu.RUnlock() + return d.measurementsByExpr(expr) +} + func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { if expr == nil { return nil, false, nil @@ -538,6 +544,17 @@ func (m *Measurement) SeriesByID(id uint64) *Series { return m.seriesByID[id] } +// SeriesByIDSlice returns a list of series by identifiers. +func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series { + m.mu.RLock() + defer m.mu.RUnlock() + a := make([]*Series, len(ids)) + for i, id := range ids { + a[i] = m.seriesByID[id] + } + return a +} + // AppendSeriesKeysByID appends keys for a list of series ids to a buffer. func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string { m.mu.RLock() @@ -1121,8 +1138,14 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr, return exprs } -// seriesIDsAllOrByExpr walks an expressions for matching series IDs +// SeriesIDsAllOrByExpr walks an expressions for matching series IDs // or, if no expressions is given, returns all series IDs for the measurement. +func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.seriesIDsAllOrByExpr(expr) +} + func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { // If no expression given or the measurement has no series, // we can take just return the ids or nil accordingly. @@ -1142,7 +1165,7 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error } // tagKeysByExpr extracts the tag keys wanted by the expression. -func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { +func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { switch e := expr.(type) { case *influxql.BinaryExpr: switch e.Op { @@ -1175,12 +1198,12 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) } return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil case influxql.AND, influxql.OR: - lhsKeys, lhsOk, err := m.tagKeysByExpr(e.LHS) + lhsKeys, lhsOk, err := m.TagKeysByExpr(e.LHS) if err != nil { return nil, false, err } - rhsKeys, rhsOk, err := m.tagKeysByExpr(e.RHS) + rhsKeys, rhsOk, err := m.TagKeysByExpr(e.RHS) if err != nil { return nil, false, err } @@ -1201,7 +1224,7 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) return nil, false, fmt.Errorf("invalid operator") } case *influxql.ParenExpr: - return m.tagKeysByExpr(e.Expr) + return m.TagKeysByExpr(e.Expr) } return nil, false, fmt.Errorf("%#v", expr) } diff --git a/tsdb/shard.go b/tsdb/shard.go index c300091dcc..5a07134cbf 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -12,7 +12,6 @@ import ( "os" "path/filepath" "sort" - "strings" "sync" "time" @@ -515,8 +514,6 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite return NewSeriesIterator(s, opt) case "_tagKeys": return NewTagKeysIterator(s, opt) - case "_tags": - return NewTagValuesIterator(s, opt) default: return nil, fmt.Errorf("unknown system source: %s", m.Name) } @@ -1325,134 +1322,6 @@ type tagValuesIterator struct { } } -// NewTagValuesIterator returns a new instance of TagValuesIterator. -func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { - if opt.Condition == nil { - return nil, errors.New("a condition is required") - } - - measurementExpr := influxql.CloneExpr(opt.Condition) - measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || tag.Val != "_name" { - return nil - } - } - } - return e - }), nil) - - mms, ok, err := sh.index.measurementsByExpr(measurementExpr) - if err != nil { - return nil, err - } else if !ok { - mms = sh.index.Measurements() - sort.Sort(mms) - } - - // If there are no measurements, return immediately. - if len(mms) == 0 { - return &tagValuesIterator{}, nil - } - - filterExpr := influxql.CloneExpr(opt.Condition) - filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || strings.HasPrefix(tag.Val, "_") { - return nil - } - } - } - return e - }), nil) - - var series []*Series - keys := newStringSet() - for _, mm := range mms { - ss, ok, err := mm.tagKeysByExpr(opt.Condition) - if err != nil { - return nil, err - } else if !ok { - keys.add(mm.TagKeys()...) - } else { - keys = keys.union(ss) - } - - ids, err := mm.seriesIDsAllOrByExpr(filterExpr) - if err != nil { - return nil, err - } - - for _, id := range ids { - series = append(series, mm.SeriesByID(id)) - } - } - - return &tagValuesIterator{ - series: series, - keys: keys.list(), - fields: influxql.VarRefs(opt.Aux).Strings(), - }, nil -} - -// Stats returns stats about the points processed. -func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} } - -// Close closes the iterator. -func (itr *tagValuesIterator) Close() error { return nil } - -// Next emits the next point in the iterator. -func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) { - for { - // If there are no more values then move to the next key. - if len(itr.buf.keys) == 0 { - if len(itr.series) == 0 { - return nil, nil - } - - itr.buf.s = itr.series[0] - itr.buf.keys = itr.keys - itr.series = itr.series[1:] - continue - } - - key := itr.buf.keys[0] - value, ok := itr.buf.s.Tags[key] - if !ok { - itr.buf.keys = itr.buf.keys[1:] - continue - } - - // Prepare auxiliary fields. - auxFields := make([]interface{}, len(itr.fields)) - for i, f := range itr.fields { - switch f { - case "_tagKey": - auxFields[i] = key - case "value": - auxFields[i] = value - } - } - - // Return next key. - p := &influxql.FloatPoint{ - Name: itr.buf.s.measurement.Name, - Aux: auxFields, - } - itr.buf.keys = itr.buf.keys[1:] - - return p, nil - } -} - // measurementKeyFunc is the function called by measurementKeysIterator. type measurementKeyFunc func(m *Measurement) []string