Initial implementation of explain plan
It prints the statistics of each iterator that will access the storage engine. For each access of the storage engine, it will print the number of shards that will potentially be accessed, the number of files that may be accessed, the number of series that will be created, the number of blocks, and the size of those blocks.pull/8776/head
parent
006c8193a9
commit
50d404e690
|
|
@ -23,6 +23,7 @@
|
|||
- [#8572](https://github.com/influxdata/influxdb/issues/8668): InfluxDB now uses MIT licensed version of BurntSushi/toml.
|
||||
- [#8752](https://github.com/influxdata/influxdb/pull/8752): Use system cursors for measurement, series, and tag key meta queries.
|
||||
- [#6563](https://github.com/influxdata/influxdb/issues/6563): Support Ctrl+C to cancel a running query in the Influx CLI. Thanks @emluque!
|
||||
- [#8776](https://github.com/influxdata/influxdb/pull/8776): Initial implementation of explain plan.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
|||
|
|
@ -181,6 +181,32 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
|
|||
return sg.CreateIterator(m.Name, opt)
|
||||
}
|
||||
|
||||
func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
|
||||
source := Source{
|
||||
Database: m.Database,
|
||||
RetentionPolicy: m.RetentionPolicy,
|
||||
}
|
||||
|
||||
sg := a.ShardMap[source]
|
||||
if sg == nil {
|
||||
return query.IteratorCost{}, nil
|
||||
}
|
||||
|
||||
if m.Regex != nil {
|
||||
var costs query.IteratorCost
|
||||
measurements := sg.MeasurementsByRegex(m.Regex.Val)
|
||||
for _, measurement := range measurements {
|
||||
cost, err := sg.IteratorCost(measurement, opt)
|
||||
if err != nil {
|
||||
return query.IteratorCost{}, err
|
||||
}
|
||||
costs = costs.Combine(cost)
|
||||
}
|
||||
return costs, nil
|
||||
}
|
||||
return sg.IteratorCost(m.Name, opt)
|
||||
}
|
||||
|
||||
// Close clears out the list of mapped shards.
|
||||
func (a *LocalShardMapping) Close() error {
|
||||
a.ShardMap = nil
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
|
|
@ -401,7 +402,39 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
|
||||
return nil, errors.New("unimplemented")
|
||||
if q.Analyze {
|
||||
return nil, errors.New("analyze is currently unimplemented")
|
||||
}
|
||||
|
||||
opt := query.SelectOptions{
|
||||
InterruptCh: ctx.InterruptCh,
|
||||
NodeID: ctx.ExecutionOptions.NodeID,
|
||||
MaxSeriesN: e.MaxSelectSeriesN,
|
||||
MaxBucketsN: e.MaxSelectBucketsN,
|
||||
Authorizer: ctx.Authorizer,
|
||||
}
|
||||
|
||||
// Prepare the query for execution, but do not actually execute it.
|
||||
// This should perform any needed substitutions.
|
||||
p, err := query.Prepare(q.Statement, e.ShardMapper, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
plan, err := p.Explain()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
plan = strings.TrimSpace(plan)
|
||||
|
||||
row := &models.Row{
|
||||
Columns: []string{"QUERY PLAN"},
|
||||
}
|
||||
for _, s := range strings.Split(plan, "\n") {
|
||||
row.Values = append(row.Values, []interface{}{s})
|
||||
}
|
||||
return models.Rows{row}, nil
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
|
||||
|
|
|
|||
|
|
@ -385,6 +385,7 @@ type MockShard struct {
|
|||
Measurements []string
|
||||
FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
|
||||
CreateIteratorFn func(m string, opt query.IteratorOptions) (query.Iterator, error)
|
||||
IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
|
||||
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
|
||||
}
|
||||
|
||||
|
|
@ -420,6 +421,10 @@ func (sh *MockShard) CreateIterator(measurement string, opt query.IteratorOption
|
|||
return sh.CreateIteratorFn(measurement, opt)
|
||||
}
|
||||
|
||||
func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
|
||||
return sh.IteratorCostFn(measurement, opt)
|
||||
}
|
||||
|
||||
func (sh *MockShard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
|
||||
return sh.ExpandSourcesFn(sources)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,84 @@
|
|||
package query
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
)
|
||||
|
||||
func (p *preparedStatement) Explain() (string, error) {
|
||||
// Determine the cost of all iterators created as part of this plan.
|
||||
ic := &explainIteratorCreator{ic: p.ic}
|
||||
p.ic = ic
|
||||
itrs, _, err := p.Select()
|
||||
p.ic = ic.ic
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
Iterators(itrs).Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
for i, node := range ic.nodes {
|
||||
if i > 0 {
|
||||
buf.WriteString("\n")
|
||||
}
|
||||
|
||||
expr := "<nil>"
|
||||
if node.Expr != nil {
|
||||
expr = node.Expr.String()
|
||||
}
|
||||
fmt.Fprintf(&buf, "EXPRESSION: %s\n", expr)
|
||||
if len(node.Aux) != 0 {
|
||||
refs := make([]string, len(node.Aux))
|
||||
for i, ref := range node.Aux {
|
||||
refs[i] = ref.String()
|
||||
}
|
||||
fmt.Fprintf(&buf, "AUXILIARY FIELDS: %s\n", strings.Join(refs, ", "))
|
||||
}
|
||||
fmt.Fprintf(&buf, "NUMBER OF SHARDS: %d\n", node.Cost.NumShards)
|
||||
fmt.Fprintf(&buf, "NUMBER OF SERIES: %d\n", node.Cost.NumSeries)
|
||||
fmt.Fprintf(&buf, "NUMBER OF FILES: %d\n", node.Cost.NumFiles)
|
||||
fmt.Fprintf(&buf, "NUMBER OF BLOCKS: %d\n", node.Cost.BlocksRead)
|
||||
fmt.Fprintf(&buf, "SIZE OF BLOCKS: %d\n", node.Cost.BlockSize)
|
||||
}
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
||||
type planNode struct {
|
||||
Expr influxql.Expr
|
||||
Aux []influxql.VarRef
|
||||
Cost IteratorCost
|
||||
}
|
||||
|
||||
type explainIteratorCreator struct {
|
||||
ic interface {
|
||||
IteratorCreator
|
||||
io.Closer
|
||||
}
|
||||
nodes []planNode
|
||||
}
|
||||
|
||||
func (e *explainIteratorCreator) CreateIterator(m *influxql.Measurement, opt IteratorOptions) (Iterator, error) {
|
||||
cost, err := e.ic.IteratorCost(m, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.nodes = append(e.nodes, planNode{
|
||||
Expr: opt.Expr,
|
||||
Aux: opt.Aux,
|
||||
Cost: cost,
|
||||
})
|
||||
return &nilFloatIterator{}, nil
|
||||
}
|
||||
|
||||
func (e *explainIteratorCreator) IteratorCost(m *influxql.Measurement, opt IteratorOptions) (IteratorCost, error) {
|
||||
return e.ic.IteratorCost(m, opt)
|
||||
}
|
||||
|
||||
func (e *explainIteratorCreator) Close() error {
|
||||
return e.ic.Close()
|
||||
}
|
||||
|
|
@ -625,6 +625,9 @@ func NewReaderIterator(r io.Reader, typ influxql.DataType, stats IteratorStats)
|
|||
type IteratorCreator interface {
|
||||
// Creates a simple iterator for use in an InfluxQL query.
|
||||
CreateIterator(source *influxql.Measurement, opt IteratorOptions) (Iterator, error)
|
||||
|
||||
// Determines the potential cost for creating an iterator.
|
||||
IteratorCost(source *influxql.Measurement, opt IteratorOptions) (IteratorCost, error)
|
||||
}
|
||||
|
||||
// IteratorOptions is an object passed to CreateIterator to specify creation options.
|
||||
|
|
@ -1341,6 +1344,40 @@ func decodeIteratorStats(pb *internal.IteratorStats) IteratorStats {
|
|||
}
|
||||
}
|
||||
|
||||
// IteratorCost contains statistics retrieved for explaining what potential
|
||||
// cost may be incurred by instantiating an iterator.
|
||||
type IteratorCost struct {
|
||||
// The total number of shards that are touched by this query.
|
||||
NumShards int64
|
||||
|
||||
// The total number of non-unique series that are accessed by this query.
|
||||
// This number matches the number of cursors created by the query since
|
||||
// one cursor is created for every series.
|
||||
NumSeries int64
|
||||
|
||||
// The total number of non-unique files that may be accessed by this query.
|
||||
// This will count the number of files accessed by each series so files
|
||||
// will likely be double counted.
|
||||
NumFiles int64
|
||||
|
||||
// The number of blocks that had the potential to be accessed.
|
||||
BlocksRead int64
|
||||
|
||||
// The amount of data that can be potentially read.
|
||||
BlockSize int64
|
||||
}
|
||||
|
||||
// Combine combines the results of two IteratorCost structures into one.
|
||||
func (c IteratorCost) Combine(other IteratorCost) IteratorCost {
|
||||
return IteratorCost{
|
||||
NumShards: c.NumShards + other.NumShards,
|
||||
NumSeries: c.NumSeries + other.NumSeries,
|
||||
NumFiles: c.NumFiles + other.NumFiles,
|
||||
BlocksRead: c.BlocksRead + other.BlocksRead,
|
||||
BlockSize: c.BlockSize + other.BlockSize,
|
||||
}
|
||||
}
|
||||
|
||||
// floatFastDedupeIterator outputs unique points where the point has a single aux field.
|
||||
type floatFastDedupeIterator struct {
|
||||
input FloatIterator
|
||||
|
|
|
|||
|
|
@ -56,6 +56,9 @@ type PreparedStatement interface {
|
|||
// Select creates the Iterators that will be used to read the query.
|
||||
Select() ([]Iterator, []string, error)
|
||||
|
||||
// Explain outputs the explain plan for this statement.
|
||||
Explain() (string, error)
|
||||
|
||||
// Close closes the resources associated with this prepared statement.
|
||||
// This must be called as the mapped shards may hold open resources such
|
||||
// as network connections.
|
||||
|
|
|
|||
|
|
@ -2785,6 +2785,10 @@ func (sh *ShardGroup) CreateIterator(m *influxql.Measurement, opt query.Iterator
|
|||
return sh.CreateIteratorFn(m, opt)
|
||||
}
|
||||
|
||||
func (sh *ShardGroup) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
|
||||
return query.IteratorCost{}, nil
|
||||
}
|
||||
|
||||
func (sh *ShardGroup) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
fields = make(map[string]influxql.DataType)
|
||||
dimensions = make(map[string]struct{})
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ type Engine interface {
|
|||
Import(r io.Reader, basePath string) error
|
||||
|
||||
CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error)
|
||||
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
|
||||
WritePoints(points []models.Point) error
|
||||
|
||||
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
|
||||
|
|
|
|||
|
|
@ -2138,6 +2138,72 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt qu
|
|||
return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
|
||||
}
|
||||
|
||||
// IteratorCost produces the cost of an iterator.
|
||||
func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
|
||||
// Determine if this measurement exists. If it does not, then no shards are
|
||||
// accessed to begin with.
|
||||
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
|
||||
return query.IteratorCost{}, err
|
||||
} else if !exists {
|
||||
return query.IteratorCost{}, nil
|
||||
}
|
||||
|
||||
// Determine all of the tag sets for this query.
|
||||
tagSets, err := e.index.TagSets([]byte(measurement), opt)
|
||||
if err != nil {
|
||||
return query.IteratorCost{}, err
|
||||
}
|
||||
|
||||
// Attempt to retrieve the ref from the main expression (if it exists).
|
||||
var ref *influxql.VarRef
|
||||
if opt.Expr != nil {
|
||||
if v, ok := opt.Expr.(*influxql.VarRef); ok {
|
||||
ref = v
|
||||
} else if call, ok := opt.Expr.(*influxql.Call); ok {
|
||||
if len(call.Args) > 0 {
|
||||
ref, _ = call.Args[0].(*influxql.VarRef)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Count the number of series concatenated from the tag set.
|
||||
cost := query.IteratorCost{NumShards: 1}
|
||||
for _, t := range tagSets {
|
||||
cost.NumSeries += int64(len(t.SeriesKeys))
|
||||
for i, key := range t.SeriesKeys {
|
||||
// Retrieve the cost for the main expression (if it exists).
|
||||
if ref != nil {
|
||||
k := SeriesFieldKey(key, ref.Val)
|
||||
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
|
||||
cost = cost.Combine(c)
|
||||
}
|
||||
|
||||
// Retrieve the cost for every auxiliary field since these are also
|
||||
// iterators that we may have to look through.
|
||||
// We may want to separate these though as we are unlikely to incur
|
||||
// anywhere close to the full costs of the auxiliary iterators because
|
||||
// many of the selected values are usually skipped.
|
||||
for _, ref := range opt.Aux {
|
||||
k := SeriesFieldKey(key, ref.Val)
|
||||
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
|
||||
cost = cost.Combine(c)
|
||||
}
|
||||
|
||||
// Retrieve the expression names in the condition (if there is a condition).
|
||||
// We will also create cursors for these too.
|
||||
if t.Filters[i] != nil {
|
||||
refs := influxql.ExprNames(t.Filters[i])
|
||||
for _, ref := range refs {
|
||||
k := SeriesFieldKey(key, ref.Val)
|
||||
c := e.FileStore.Cost([]byte(k), opt.StartTime, opt.EndTime)
|
||||
cost = cost.Combine(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return cost, nil
|
||||
}
|
||||
|
||||
func (e *Engine) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
|
||||
return e.index.SeriesPointIterator(opt)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/uber-go/zap"
|
||||
)
|
||||
|
||||
|
|
@ -472,6 +473,12 @@ func (f *FileStore) Read(key []byte, t int64) ([]Value, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
return f.cost(key, min, max)
|
||||
}
|
||||
|
||||
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
|
||||
func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor {
|
||||
f.mu.RLock()
|
||||
|
|
@ -726,6 +733,47 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// We need to determine the possible files that may be accessed by this query given
|
||||
// the time range.
|
||||
func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost {
|
||||
var entries []IndexEntry
|
||||
cost := query.IteratorCost{}
|
||||
for _, fd := range f.files {
|
||||
minTime, maxTime := fd.TimeRange()
|
||||
if !(maxTime > min && minTime < max) {
|
||||
continue
|
||||
}
|
||||
skipped := true
|
||||
tombstones := fd.TombstoneRange(key)
|
||||
|
||||
fd.ReadEntries(key, &entries)
|
||||
ENTRIES:
|
||||
for i := 0; i < len(entries); i++ {
|
||||
ie := entries[i]
|
||||
|
||||
if !(ie.MaxTime > min && ie.MinTime < max) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip any blocks only contain values that are tombstoned.
|
||||
for _, t := range tombstones {
|
||||
if t.Min <= ie.MinTime && t.Max >= ie.MaxTime {
|
||||
continue ENTRIES
|
||||
}
|
||||
}
|
||||
|
||||
cost.BlocksRead++
|
||||
cost.BlockSize += int64(ie.Size)
|
||||
skipped = false
|
||||
}
|
||||
|
||||
if !skipped {
|
||||
cost.NumFiles++
|
||||
}
|
||||
}
|
||||
return cost
|
||||
}
|
||||
|
||||
// locations returns the files and index blocks for a key and time. ascending indicates
|
||||
// whether the key will be scan in ascending time order or descenging time order.
|
||||
// This function assumes the read-lock has been taken.
|
||||
|
|
@ -735,7 +783,6 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
|
|||
for _, fd := range f.files {
|
||||
minTime, maxTime := fd.TimeRange()
|
||||
|
||||
tombstones := fd.TombstoneRange(key)
|
||||
// If we ascending and the max time of the file is before where we want to start
|
||||
// skip it.
|
||||
if ascending && maxTime < t {
|
||||
|
|
@ -745,6 +792,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
|
|||
} else if !ascending && minTime > t {
|
||||
continue
|
||||
}
|
||||
tombstones := fd.TombstoneRange(key)
|
||||
|
||||
// This file could potential contain points we are looking for so find the blocks for
|
||||
// the given key.
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"math"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
|
@ -18,6 +19,7 @@ import (
|
|||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
internal "github.com/influxdata/influxdb/tsdb/internal"
|
||||
"github.com/uber-go/zap"
|
||||
|
|
@ -776,6 +778,14 @@ func (s *Shard) createSeriesIterator(opt query.IteratorOptions) (query.Iterator,
|
|||
return s.engine.SeriesPointIterator(opt)
|
||||
}
|
||||
|
||||
// IteratorCost returns the estimated cost of constructing and reading an iterator.
|
||||
func (s *Shard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
|
||||
if err := s.ready(); err != nil {
|
||||
return query.IteratorCost{}, err
|
||||
}
|
||||
return s.engine.IteratorCost(measurement, opt)
|
||||
}
|
||||
|
||||
// FieldDimensions returns unique sets of fields and dimensions across a list of sources.
|
||||
func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
if err := s.ready(); err != nil {
|
||||
|
|
@ -1008,6 +1018,7 @@ type ShardGroup interface {
|
|||
FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
|
||||
MapType(measurement, field string) influxql.DataType
|
||||
CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error)
|
||||
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
|
||||
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
|
||||
}
|
||||
|
||||
|
|
@ -1106,6 +1117,46 @@ func (a Shards) CreateIterator(measurement string, opt query.IteratorOptions) (q
|
|||
return query.Iterators(itrs).Merge(opt)
|
||||
}
|
||||
|
||||
func (a Shards) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
|
||||
var costs query.IteratorCost
|
||||
var costerr error
|
||||
var mu sync.RWMutex
|
||||
|
||||
limit := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
var wg sync.WaitGroup
|
||||
for _, sh := range a {
|
||||
limit.Take()
|
||||
wg.Add(1)
|
||||
|
||||
mu.RLock()
|
||||
if costerr != nil {
|
||||
mu.RUnlock()
|
||||
break
|
||||
}
|
||||
mu.RUnlock()
|
||||
|
||||
go func(sh *Shard) {
|
||||
defer limit.Release()
|
||||
defer wg.Done()
|
||||
|
||||
cost, err := sh.IteratorCost(measurement, opt)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
if costerr == nil {
|
||||
costerr = err
|
||||
}
|
||||
return
|
||||
}
|
||||
costs = costs.Combine(cost)
|
||||
}(sh)
|
||||
}
|
||||
wg.Wait()
|
||||
return costs, costerr
|
||||
}
|
||||
|
||||
func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
|
||||
// Use a map as a set to prevent duplicates.
|
||||
set := map[string]influxql.Source{}
|
||||
|
|
|
|||
Loading…
Reference in New Issue