diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 820215d826..f7b7b166cc 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -7,6 +7,7 @@ import ( "io" "sort" "strconv" + "strings" "time" "github.com/influxdata/influxdb" @@ -403,6 +404,12 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw } func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error { + // Handle SHOW TAG VALUES separately so it can be optimized. + // https://github.com/influxdata/influxdb/issues/6233 + if source, ok := stmt.Sources[0].(*influxql.Measurement); ok && source.Name == "_tags" { + return e.executeShowTagValues(stmt, ctx) + } + // It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now` now := time.Now().UTC() opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh} @@ -597,6 +604,130 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt return e.TSDBStore.IteratorCreator(shards) } +func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error { + if stmt.Condition == nil { + return errors.New("a condition is required") + } + + source := stmt.Sources[0].(*influxql.Measurement) + index := e.TSDBStore.DatabaseIndex(source.Database) + if index == nil { + ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)} + return nil + } + + measurementExpr := influxql.CloneExpr(stmt.Condition) + measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || tag.Val != "_name" { + return nil + } + } + } + return e + }), nil) + + mms, ok, err := index.MeasurementsByExpr(measurementExpr) + if err != nil { + return err + } else if !ok { + mms = index.Measurements() + sort.Sort(mms) + } + + // If there are no measurements, return immediately. + if len(mms) == 0 { + ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)} + return nil + } + + filterExpr := influxql.CloneExpr(stmt.Condition) + filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || strings.HasPrefix(tag.Val, "_") { + return nil + } + } + } + return e + }), nil) + + var emitted bool + columns := stmt.ColumnNames() + for _, mm := range mms { + ids, err := mm.SeriesIDsAllOrByExpr(filterExpr) + if err != nil { + return err + } + ss := mm.SeriesByIDSlice(ids) + + // Determine a list of keys from condition. + keySet, ok, err := mm.TagKeysByExpr(stmt.Condition) + if err != nil { + return err + } + + // Loop over all keys for each series. + m := make(map[keyValue]struct{}, len(ss)) + for _, series := range ss { + for key, value := range series.Tags { + if !ok { + // nop + } else if _, exists := keySet[key]; !exists { + continue + } + m[keyValue{key, value}] = struct{}{} + } + } + + // Move to next series if no key/values match. + if len(m) == 0 { + continue + } + + // Sort key/value set. + a := make([]keyValue, 0, len(m)) + for kv := range m { + a = append(a, kv) + } + sort.Sort(keyValues(a)) + + // Convert to result values. + slab := make([]interface{}, len(a)*2) + values := make([][]interface{}, len(a)) + for i, elem := range a { + slab[i*2], slab[i*2+1] = elem.key, elem.value + values[i] = slab[i*2 : i*2+2] + } + + // Send result to client. + ctx.Results <- &influxql.Result{ + StatementID: ctx.StatementID, + Series: []*models.Row{&models.Row{ + Name: mm.Name, + Columns: columns, + Values: values, + }}, + } + emitted = true + } + + // Always emit at least one row. + if !emitted { + ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)} + } + + return nil +} + func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) { dis := e.MetaClient.Databases() @@ -1008,6 +1139,7 @@ type TSDBStore interface { DeleteRetentionPolicy(database, name string) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error + DatabaseIndex(name string) *tsdb.DatabaseIndex IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) ShardIteratorCreator(id uint64) influxql.IteratorCreator } @@ -1105,3 +1237,19 @@ type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } + +type keyValue struct { + key, value string +} + +type keyValues []keyValue + +func (a keyValues) Len() int { return len(a) } +func (a keyValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a keyValues) Less(i, j int) bool { + ki, kj := a[i].key, a[j].key + if ki == kj { + return a[i].value < a[j].value + } + return ki < kj +} diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 9917ffaa26..59b7091dd3 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" + "github.com/influxdata/influxdb/tsdb" ) const ( @@ -200,6 +201,7 @@ type TSDBStore struct { DeleteRetentionPolicyFn func(database, name string) error DeleteShardFn func(id uint64) error DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error + DatabaseIndexFn func(name string) *tsdb.DatabaseIndex ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator } @@ -267,6 +269,10 @@ func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator { return s.ShardIteratorCreatorFn(id) } +func (s *TSDBStore) DatabaseIndex(name string) *tsdb.DatabaseIndex { + return s.DatabaseIndexFn(name) +} + // MustParseQuery parses s into a query. Panic on error. func MustParseQuery(s string) *influxql.Query { q, err := influxql.ParseQuery(s) diff --git a/tsdb/meta.go b/tsdb/meta.go index 1ff1e90830..c801c8e063 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -239,12 +239,18 @@ func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { return ss.Tags } -// measurementsByExpr takes an expression containing only tags and returns a +// MeasurementsByExpr takes an expression containing only tags and returns a // list of matching *Measurement. The bool return argument returns if the // expression was a measurement expression. It is used to differentiate a list // of no measurements because all measurements were filtered out (when the bool // is true) against when there are no measurements because the expression // wasn't evaluated (when the bool is false). +func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { + d.mu.RLock() + defer d.mu.RUnlock() + return d.measurementsByExpr(expr) +} + func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { if expr == nil { return nil, false, nil @@ -538,6 +544,17 @@ func (m *Measurement) SeriesByID(id uint64) *Series { return m.seriesByID[id] } +// SeriesByIDSlice returns a list of series by identifiers. +func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series { + m.mu.RLock() + defer m.mu.RUnlock() + a := make([]*Series, len(ids)) + for i, id := range ids { + a[i] = m.seriesByID[id] + } + return a +} + // AppendSeriesKeysByID appends keys for a list of series ids to a buffer. func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string { m.mu.RLock() @@ -1121,8 +1138,14 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr, return exprs } -// seriesIDsAllOrByExpr walks an expressions for matching series IDs +// SeriesIDsAllOrByExpr walks an expressions for matching series IDs // or, if no expressions is given, returns all series IDs for the measurement. +func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.seriesIDsAllOrByExpr(expr) +} + func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { // If no expression given or the measurement has no series, // we can take just return the ids or nil accordingly. @@ -1142,7 +1165,7 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error } // tagKeysByExpr extracts the tag keys wanted by the expression. -func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { +func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { switch e := expr.(type) { case *influxql.BinaryExpr: switch e.Op { @@ -1175,12 +1198,12 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) } return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil case influxql.AND, influxql.OR: - lhsKeys, lhsOk, err := m.tagKeysByExpr(e.LHS) + lhsKeys, lhsOk, err := m.TagKeysByExpr(e.LHS) if err != nil { return nil, false, err } - rhsKeys, rhsOk, err := m.tagKeysByExpr(e.RHS) + rhsKeys, rhsOk, err := m.TagKeysByExpr(e.RHS) if err != nil { return nil, false, err } @@ -1201,7 +1224,7 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) return nil, false, fmt.Errorf("invalid operator") } case *influxql.ParenExpr: - return m.tagKeysByExpr(e.Expr) + return m.TagKeysByExpr(e.Expr) } return nil, false, fmt.Errorf("%#v", expr) } diff --git a/tsdb/shard.go b/tsdb/shard.go index c300091dcc..5a07134cbf 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -12,7 +12,6 @@ import ( "os" "path/filepath" "sort" - "strings" "sync" "time" @@ -515,8 +514,6 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite return NewSeriesIterator(s, opt) case "_tagKeys": return NewTagKeysIterator(s, opt) - case "_tags": - return NewTagValuesIterator(s, opt) default: return nil, fmt.Errorf("unknown system source: %s", m.Name) } @@ -1325,134 +1322,6 @@ type tagValuesIterator struct { } } -// NewTagValuesIterator returns a new instance of TagValuesIterator. -func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { - if opt.Condition == nil { - return nil, errors.New("a condition is required") - } - - measurementExpr := influxql.CloneExpr(opt.Condition) - measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || tag.Val != "_name" { - return nil - } - } - } - return e - }), nil) - - mms, ok, err := sh.index.measurementsByExpr(measurementExpr) - if err != nil { - return nil, err - } else if !ok { - mms = sh.index.Measurements() - sort.Sort(mms) - } - - // If there are no measurements, return immediately. - if len(mms) == 0 { - return &tagValuesIterator{}, nil - } - - filterExpr := influxql.CloneExpr(opt.Condition) - filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || strings.HasPrefix(tag.Val, "_") { - return nil - } - } - } - return e - }), nil) - - var series []*Series - keys := newStringSet() - for _, mm := range mms { - ss, ok, err := mm.tagKeysByExpr(opt.Condition) - if err != nil { - return nil, err - } else if !ok { - keys.add(mm.TagKeys()...) - } else { - keys = keys.union(ss) - } - - ids, err := mm.seriesIDsAllOrByExpr(filterExpr) - if err != nil { - return nil, err - } - - for _, id := range ids { - series = append(series, mm.SeriesByID(id)) - } - } - - return &tagValuesIterator{ - series: series, - keys: keys.list(), - fields: influxql.VarRefs(opt.Aux).Strings(), - }, nil -} - -// Stats returns stats about the points processed. -func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} } - -// Close closes the iterator. -func (itr *tagValuesIterator) Close() error { return nil } - -// Next emits the next point in the iterator. -func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) { - for { - // If there are no more values then move to the next key. - if len(itr.buf.keys) == 0 { - if len(itr.series) == 0 { - return nil, nil - } - - itr.buf.s = itr.series[0] - itr.buf.keys = itr.keys - itr.series = itr.series[1:] - continue - } - - key := itr.buf.keys[0] - value, ok := itr.buf.s.Tags[key] - if !ok { - itr.buf.keys = itr.buf.keys[1:] - continue - } - - // Prepare auxiliary fields. - auxFields := make([]interface{}, len(itr.fields)) - for i, f := range itr.fields { - switch f { - case "_tagKey": - auxFields[i] = key - case "value": - auxFields[i] = value - } - } - - // Return next key. - p := &influxql.FloatPoint{ - Name: itr.buf.s.measurement.Name, - Aux: auxFields, - } - itr.buf.keys = itr.buf.keys[1:] - - return p, nil - } -} - // measurementKeyFunc is the function called by measurementKeysIterator. type measurementKeyFunc func(m *Measurement) []string