diff --git a/CHANGELOG.md b/CHANGELOG.md index c0d0e11649..ba1f26befd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - [#8572](https://github.com/influxdata/influxdb/issues/8668): InfluxDB now uses MIT licensed version of BurntSushi/toml. - [#8752](https://github.com/influxdata/influxdb/pull/8752): Use system cursors for measurement, series, and tag key meta queries. - [#6563](https://github.com/influxdata/influxdb/issues/6563): Support Ctrl+C to cancel a running query in the Influx CLI. Thanks @emluque! +- [#8776](https://github.com/influxdata/influxdb/pull/8776): Initial implementation of explain plan. ### Bugfixes diff --git a/coordinator/shard_mapper.go b/coordinator/shard_mapper.go index ff28ae326a..961affc548 100644 --- a/coordinator/shard_mapper.go +++ b/coordinator/shard_mapper.go @@ -181,6 +181,32 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It return sg.CreateIterator(m.Name, opt) } +func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) { + source := Source{ + Database: m.Database, + RetentionPolicy: m.RetentionPolicy, + } + + sg := a.ShardMap[source] + if sg == nil { + return query.IteratorCost{}, nil + } + + if m.Regex != nil { + var costs query.IteratorCost + measurements := sg.MeasurementsByRegex(m.Regex.Val) + for _, measurement := range measurements { + cost, err := sg.IteratorCost(measurement, opt) + if err != nil { + return query.IteratorCost{}, err + } + costs = costs.Combine(cost) + } + return costs, nil + } + return sg.IteratorCost(m.Name, opt) +} + // Close clears out the list of mapped shards. func (a *LocalShardMapping) Close() error { a.ShardMap = nil diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 0afe9fc843..06e8efff76 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -7,6 +7,7 @@ import ( "io" "sort" "strconv" + "strings" "time" "github.com/influxdata/influxdb" @@ -401,7 +402,39 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme } func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) { - return nil, errors.New("unimplemented") + if q.Analyze { + return nil, errors.New("analyze is currently unimplemented") + } + + opt := query.SelectOptions{ + InterruptCh: ctx.InterruptCh, + NodeID: ctx.ExecutionOptions.NodeID, + MaxSeriesN: e.MaxSelectSeriesN, + MaxBucketsN: e.MaxSelectBucketsN, + Authorizer: ctx.Authorizer, + } + + // Prepare the query for execution, but do not actually execute it. + // This should perform any needed substitutions. + p, err := query.Prepare(q.Statement, e.ShardMapper, opt) + if err != nil { + return nil, err + } + defer p.Close() + + plan, err := p.Explain() + if err != nil { + return nil, err + } + plan = strings.TrimSpace(plan) + + row := &models.Row{ + Columns: []string{"QUERY PLAN"}, + } + for _, s := range strings.Split(plan, "\n") { + row.Values = append(row.Values, []interface{}{s}) + } + return models.Rows{row}, nil } func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error { diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 893c190860..d301d17004 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -385,6 +385,7 @@ type MockShard struct { Measurements []string FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) CreateIteratorFn func(m string, opt query.IteratorOptions) (query.Iterator, error) + IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error) ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) } @@ -420,6 +421,10 @@ func (sh *MockShard) CreateIterator(measurement string, opt query.IteratorOption return sh.CreateIteratorFn(measurement, opt) } +func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + return sh.IteratorCostFn(measurement, opt) +} + func (sh *MockShard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { return sh.ExpandSourcesFn(sources) } diff --git a/query/explain.go b/query/explain.go new file mode 100644 index 0000000000..71f99ed24f --- /dev/null +++ b/query/explain.go @@ -0,0 +1,84 @@ +package query + +import ( + "bytes" + "fmt" + "io" + "strings" + + "github.com/influxdata/influxdb/influxql" +) + +func (p *preparedStatement) Explain() (string, error) { + // Determine the cost of all iterators created as part of this plan. + ic := &explainIteratorCreator{ic: p.ic} + p.ic = ic + itrs, _, err := p.Select() + p.ic = ic.ic + + if err != nil { + return "", err + } + Iterators(itrs).Close() + + var buf bytes.Buffer + for i, node := range ic.nodes { + if i > 0 { + buf.WriteString("\n") + } + + expr := "" + if node.Expr != nil { + expr = node.Expr.String() + } + fmt.Fprintf(&buf, "EXPRESSION: %s\n", expr) + if len(node.Aux) != 0 { + refs := make([]string, len(node.Aux)) + for i, ref := range node.Aux { + refs[i] = ref.String() + } + fmt.Fprintf(&buf, "AUXILIARY FIELDS: %s\n", strings.Join(refs, ", ")) + } + fmt.Fprintf(&buf, "NUMBER OF SHARDS: %d\n", node.Cost.NumShards) + fmt.Fprintf(&buf, "NUMBER OF SERIES: %d\n", node.Cost.NumSeries) + fmt.Fprintf(&buf, "NUMBER OF FILES: %d\n", node.Cost.NumFiles) + fmt.Fprintf(&buf, "NUMBER OF BLOCKS: %d\n", node.Cost.BlocksRead) + fmt.Fprintf(&buf, "SIZE OF BLOCKS: %d\n", node.Cost.BlockSize) + } + return buf.String(), nil +} + +type planNode struct { + Expr influxql.Expr + Aux []influxql.VarRef + Cost IteratorCost +} + +type explainIteratorCreator struct { + ic interface { + IteratorCreator + io.Closer + } + nodes []planNode +} + +func (e *explainIteratorCreator) CreateIterator(m *influxql.Measurement, opt IteratorOptions) (Iterator, error) { + cost, err := e.ic.IteratorCost(m, opt) + if err != nil { + return nil, err + } + e.nodes = append(e.nodes, planNode{ + Expr: opt.Expr, + Aux: opt.Aux, + Cost: cost, + }) + return &nilFloatIterator{}, nil +} + +func (e *explainIteratorCreator) IteratorCost(m *influxql.Measurement, opt IteratorOptions) (IteratorCost, error) { + return e.ic.IteratorCost(m, opt) +} + +func (e *explainIteratorCreator) Close() error { + return e.ic.Close() +} diff --git a/query/iterator.go b/query/iterator.go index 4b692dc83a..07767a529c 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -625,6 +625,9 @@ func NewReaderIterator(r io.Reader, typ influxql.DataType, stats IteratorStats) type IteratorCreator interface { // Creates a simple iterator for use in an InfluxQL query. CreateIterator(source *influxql.Measurement, opt IteratorOptions) (Iterator, error) + + // Determines the potential cost for creating an iterator. + IteratorCost(source *influxql.Measurement, opt IteratorOptions) (IteratorCost, error) } // IteratorOptions is an object passed to CreateIterator to specify creation options. @@ -1341,6 +1344,40 @@ func decodeIteratorStats(pb *internal.IteratorStats) IteratorStats { } } +// IteratorCost contains statistics retrieved for explaining what potential +// cost may be incurred by instantiating an iterator. +type IteratorCost struct { + // The total number of shards that are touched by this query. + NumShards int64 + + // The total number of non-unique series that are accessed by this query. + // This number matches the number of cursors created by the query since + // one cursor is created for every series. + NumSeries int64 + + // The total number of non-unique files that may be accessed by this query. + // This will count the number of files accessed by each series so files + // will likely be double counted. + NumFiles int64 + + // The number of blocks that had the potential to be accessed. + BlocksRead int64 + + // The amount of data that can be potentially read. + BlockSize int64 +} + +// Combine combines the results of two IteratorCost structures into one. +func (c IteratorCost) Combine(other IteratorCost) IteratorCost { + return IteratorCost{ + NumShards: c.NumShards + other.NumShards, + NumSeries: c.NumSeries + other.NumSeries, + NumFiles: c.NumFiles + other.NumFiles, + BlocksRead: c.BlocksRead + other.BlocksRead, + BlockSize: c.BlockSize + other.BlockSize, + } +} + // floatFastDedupeIterator outputs unique points where the point has a single aux field. type floatFastDedupeIterator struct { input FloatIterator diff --git a/query/select.go b/query/select.go index d35f6962f8..ef76ac7561 100644 --- a/query/select.go +++ b/query/select.go @@ -56,6 +56,9 @@ type PreparedStatement interface { // Select creates the Iterators that will be used to read the query. Select() ([]Iterator, []string, error) + // Explain outputs the explain plan for this statement. + Explain() (string, error) + // Close closes the resources associated with this prepared statement. // This must be called as the mapped shards may hold open resources such // as network connections. diff --git a/query/select_test.go b/query/select_test.go index da08f800ed..733e43e4d7 100644 --- a/query/select_test.go +++ b/query/select_test.go @@ -2785,6 +2785,10 @@ func (sh *ShardGroup) CreateIterator(m *influxql.Measurement, opt query.Iterator return sh.CreateIteratorFn(m, opt) } +func (sh *ShardGroup) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) { + return query.IteratorCost{}, nil +} + func (sh *ShardGroup) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) diff --git a/tsdb/engine.go b/tsdb/engine.go index cd66691b87..1976ca53ef 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -44,6 +44,7 @@ type Engine interface { Import(r io.Reader, basePath string) error CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error) + IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) WritePoints(points []models.Point) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 829324b770..ce8edf60cd 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2138,6 +2138,72 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt qu return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor) } +// IteratorCost produces the cost of an iterator. +func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + // Determine if this measurement exists. If it does not, then no shards are + // accessed to begin with. + if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { + return query.IteratorCost{}, err + } else if !exists { + return query.IteratorCost{}, nil + } + + // Determine all of the tag sets for this query. + tagSets, err := e.index.TagSets([]byte(measurement), opt) + if err != nil { + return query.IteratorCost{}, err + } + + // Attempt to retrieve the ref from the main expression (if it exists). + var ref *influxql.VarRef + if opt.Expr != nil { + if v, ok := opt.Expr.(*influxql.VarRef); ok { + ref = v + } else if call, ok := opt.Expr.(*influxql.Call); ok { + if len(call.Args) > 0 { + ref, _ = call.Args[0].(*influxql.VarRef) + } + } + } + + // Count the number of series concatenated from the tag set. + cost := query.IteratorCost{NumShards: 1} + for _, t := range tagSets { + cost.NumSeries += int64(len(t.SeriesKeys)) + for i, key := range t.SeriesKeys { + // Retrieve the cost for the main expression (if it exists). + if ref != nil { + k := SeriesFieldKey(key, ref.Val) + c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime) + cost = cost.Combine(c) + } + + // Retrieve the cost for every auxiliary field since these are also + // iterators that we may have to look through. + // We may want to separate these though as we are unlikely to incur + // anywhere close to the full costs of the auxiliary iterators because + // many of the selected values are usually skipped. + for _, ref := range opt.Aux { + k := SeriesFieldKey(key, ref.Val) + c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime) + cost = cost.Combine(c) + } + + // Retrieve the expression names in the condition (if there is a condition). + // We will also create cursors for these too. + if t.Filters[i] != nil { + refs := influxql.ExprNames(t.Filters[i]) + for _, ref := range refs { + k := SeriesFieldKey(key, ref.Val) + c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime) + cost = cost.Combine(c) + } + } + } + } + return cost, nil +} + func (e *Engine) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) { return e.index.SeriesPointIterator(opt) } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index e09a422eb1..0aee0588cd 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -15,6 +15,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/query" "github.com/uber-go/zap" ) @@ -472,6 +473,12 @@ func (f *FileStore) Read(key []byte, t int64) ([]Value, error) { return nil, nil } +func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost { + f.mu.RLock() + defer f.mu.RUnlock() + return f.cost(key, min, max) +} + // KeyCursor returns a KeyCursor for key and t across the files in the FileStore. func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor { f.mu.RLock() @@ -726,6 +733,47 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error { return nil } +// We need to determine the possible files that may be accessed by this query given +// the time range. +func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost { + var entries []IndexEntry + cost := query.IteratorCost{} + for _, fd := range f.files { + minTime, maxTime := fd.TimeRange() + if !(maxTime > min && minTime < max) { + continue + } + skipped := true + tombstones := fd.TombstoneRange(key) + + fd.ReadEntries(key, &entries) + ENTRIES: + for i := 0; i < len(entries); i++ { + ie := entries[i] + + if !(ie.MaxTime > min && ie.MinTime < max) { + continue + } + + // Skip any blocks only contain values that are tombstoned. + for _, t := range tombstones { + if t.Min <= ie.MinTime && t.Max >= ie.MaxTime { + continue ENTRIES + } + } + + cost.BlocksRead++ + cost.BlockSize += int64(ie.Size) + skipped = false + } + + if !skipped { + cost.NumFiles++ + } + } + return cost +} + // locations returns the files and index blocks for a key and time. ascending indicates // whether the key will be scan in ascending time order or descenging time order. // This function assumes the read-lock has been taken. @@ -735,7 +783,6 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { for _, fd := range f.files { minTime, maxTime := fd.TimeRange() - tombstones := fd.TombstoneRange(key) // If we ascending and the max time of the file is before where we want to start // skip it. if ascending && maxTime < t { @@ -745,6 +792,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { } else if !ascending && minTime > t { continue } + tombstones := fd.TombstoneRange(key) // This file could potential contain points we are looking for so find the blocks for // the given key. diff --git a/tsdb/shard.go b/tsdb/shard.go index 1afc25a4d7..594a69815c 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -8,6 +8,7 @@ import ( "math" "path/filepath" "regexp" + "runtime" "sort" "strings" "sync" @@ -18,6 +19,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/query" internal "github.com/influxdata/influxdb/tsdb/internal" "github.com/uber-go/zap" @@ -783,6 +785,14 @@ func (s *Shard) createSeriesIterator(opt query.IteratorOptions) (query.Iterator, return s.engine.SeriesPointIterator(opt) } +// IteratorCost returns the estimated cost of constructing and reading an iterator. +func (s *Shard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + if err := s.ready(); err != nil { + return query.IteratorCost{}, err + } + return s.engine.IteratorCost(measurement, opt) +} + // FieldDimensions returns unique sets of fields and dimensions across a list of sources. func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { if err := s.ready(); err != nil { @@ -1018,6 +1028,7 @@ type ShardGroup interface { FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) MapType(measurement, field string) influxql.DataType CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error) + IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) ExpandSources(sources influxql.Sources) (influxql.Sources, error) } @@ -1116,6 +1127,46 @@ func (a Shards) CreateIterator(measurement string, opt query.IteratorOptions) (q return query.Iterators(itrs).Merge(opt) } +func (a Shards) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { + var costs query.IteratorCost + var costerr error + var mu sync.RWMutex + + limit := limiter.NewFixed(runtime.GOMAXPROCS(0)) + var wg sync.WaitGroup + for _, sh := range a { + limit.Take() + wg.Add(1) + + mu.RLock() + if costerr != nil { + mu.RUnlock() + break + } + mu.RUnlock() + + go func(sh *Shard) { + defer limit.Release() + defer wg.Done() + + cost, err := sh.IteratorCost(measurement, opt) + + mu.Lock() + defer mu.Unlock() + + if err != nil { + if costerr == nil { + costerr = err + } + return + } + costs = costs.Combine(cost) + }(sh) + } + wg.Wait() + return costs, costerr +} + func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { // Use a map as a set to prevent duplicates. set := map[string]influxql.Source{}