Refactor the select call into three separate phases

The first call is to compile the query. This performs some initial
processing that can be done before having any access to the shards. At
the moment, it does very little, but it's intended to be changed to
eventually perform initial validations of the query and create an
internal graph structure for the execution of the query.

The second call is to prepare the query. This step has access to the
shard mapper. Right now, it just maps the shards and rewrites the fields
of the query for any wildcards. In the future, it is intended to do the
above, but also to prepare the final directed acyclical graph that will
execute the query.

The third call is to select the query. This step is intended to create
all of the iterators for processing the query. At the moment, much of
the work intended for the second step is performed in the third step.
pull/8744/head
Jonathan A. Sternberg 2017-08-23 13:20:51 -05:00
parent f7724b780a
commit 8738e72cf1
4 changed files with 142 additions and 77 deletions

View File

@ -525,7 +525,6 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}
func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) ([]query.Iterator, []string, error) {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
opt := query.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,

108
query/compile.go Normal file
View File

@ -0,0 +1,108 @@
package query
import (
"fmt"
"time"
"github.com/influxdata/influxdb/influxql"
)
// CompileOptions are the customization options for the compiler.
type CompileOptions struct {
Now time.Time
}
// Statement is a compiled query statement.
type Statement interface {
// Prepare prepares the statement by mapping shards and finishing the creation
// of the query plan.
Prepare(shardMapper ShardMapper, opt SelectOptions) (PreparedStatement, error)
}
func Compile(stmt *influxql.SelectStatement, opt CompileOptions) (Statement, error) {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := opt.Now
if now.IsZero() {
now = time.Now().UTC()
}
// Evaluate the now() condition immediately so we do not have to deal with this.
nowValuer := influxql.NowValuer{Now: now, Location: stmt.Location}
stmt = stmt.Reduce(&nowValuer)
// Convert DISTINCT into a call.
stmt.RewriteDistinct()
// Remove "time" from fields list.
stmt.RewriteTimeFields()
// Rewrite time condition.
if err := stmt.RewriteTimeCondition(now); err != nil {
return nil, err
}
// Rewrite any regex conditions that could make use of the index.
stmt.RewriteRegexConditions()
return &compiledStatement{stmt: stmt}, nil
}
// compiledStatement represents a select statement that has undergone some initial processing to
// determine if it is valid and to have some initial modifications done on the AST.
type compiledStatement struct {
stmt *influxql.SelectStatement
}
func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) (PreparedStatement, error) {
// Determine the time range spanned by the condition so we can map shards.
nowValuer := influxql.NowValuer{Location: c.stmt.Location}
_, timeRange, err := influxql.ConditionExpr(c.stmt.Condition, &nowValuer)
if err != nil {
return nil, err
}
// Create an iterator creator based on the shards in the cluster.
shards, err := shardMapper.MapShards(c.stmt.Sources, timeRange, sopt)
if err != nil {
return nil, err
}
defer shards.Close()
// Rewrite wildcards, if any exist.
stmt, err := c.stmt.RewriteFields(shards)
if err != nil {
return nil, err
}
// Determine base options for iterators.
opt, err := newIteratorOptionsStmt(stmt, sopt)
if err != nil {
return nil, err
}
if sopt.MaxBucketsN > 0 && !stmt.IsRawQuery {
interval, err := stmt.GroupByInterval()
if err != nil {
return nil, err
}
if interval > 0 {
// Determine the start and end time matched to the interval (may not match the actual times).
first, _ := opt.Window(opt.StartTime)
last, _ := opt.Window(opt.EndTime - 1)
// Determine the number of buckets by finding the time span and dividing by the interval.
buckets := (last - first + int64(interval)) / int64(interval)
if int(buckets) > sopt.MaxBucketsN {
return nil, fmt.Errorf("max-select-buckets limit exceeded: (%d/%d)", buckets, sopt.MaxBucketsN)
}
}
}
columns := stmt.ColumnNames()
return &preparedStatement{
stmt: stmt,
opt: opt,
ic: shards,
columns: columns,
}, nil
}

View File

@ -684,8 +684,7 @@ type IteratorOptions struct {
}
// newIteratorOptionsStmt creates the iterator options from stmt.
func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt *SelectOptions) (opt IteratorOptions, err error) {
func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions) (opt IteratorOptions, err error) {
// Determine time range from the condition.
valuer := &influxql.NowValuer{Location: stmt.Location}
condition, timeRange, err := influxql.ConditionExpr(stmt.Condition, valuer)
@ -747,17 +746,15 @@ func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt *SelectOptions)
}
opt.Limit, opt.Offset = stmt.Limit, stmt.Offset
opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset
if sopt != nil {
opt.MaxSeriesN = sopt.MaxSeriesN
opt.InterruptCh = sopt.InterruptCh
opt.Authorizer = sopt.Authorizer
}
opt.MaxSeriesN = sopt.MaxSeriesN
opt.InterruptCh = sopt.InterruptCh
opt.Authorizer = sopt.Authorizer
return opt, nil
}
func newIteratorOptionsSubstatement(stmt *influxql.SelectStatement, opt IteratorOptions) (IteratorOptions, error) {
subOpt, err := newIteratorOptionsStmt(stmt, nil)
subOpt, err := newIteratorOptionsStmt(stmt, SelectOptions{})
if err != nil {
return IteratorOptions{}, err
}

View File

@ -6,7 +6,6 @@ import (
"io"
"math"
"sort"
"time"
"github.com/influxdata/influxdb/influxql"
)
@ -45,83 +44,45 @@ type ShardGroup interface {
io.Closer
}
// Select executes stmt against ic and returns a list of iterators to stream from.
//
// Statements should have all rewriting performed before calling select(). This
// includes wildcard and source expansion.
func Select(stmt *influxql.SelectStatement, shardMapper ShardMapper, sopt SelectOptions) ([]Iterator, []string, error) {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
// Select is a prepared statement that is ready to be executed.
type PreparedStatement interface {
// Select creates the Iterators that will be used to read the query.
Select() ([]Iterator, []string, error)
}
// Evaluate the now() condition immediately so we do not have to deal with this.
nowValuer := influxql.NowValuer{Now: now, Location: stmt.Location}
stmt = stmt.Reduce(&nowValuer)
// Convert DISTINCT into a call.
stmt.RewriteDistinct()
// Remove "time" from fields list.
stmt.RewriteTimeFields()
// Rewrite time condition.
if err := stmt.RewriteTimeCondition(now); err != nil {
return nil, nil, err
// Prepare will compile the statement with the default compile options and
// then prepare the query.
func Prepare(stmt *influxql.SelectStatement, shardMapper ShardMapper, opt SelectOptions) (PreparedStatement, error) {
c, err := Compile(stmt, CompileOptions{})
if err != nil {
return nil, err
}
return c.Prepare(shardMapper, opt)
}
// Rewrite any regex conditions that could make use of the index.
stmt.RewriteRegexConditions()
// Determine the time range spanned by the condition so we can map shards.
_, timeRange, err := influxql.ConditionExpr(stmt.Condition, &nowValuer)
// Select compiles, prepares, and then initiates execution of the query using the
// default compile options.
func Select(stmt *influxql.SelectStatement, shardMapper ShardMapper, opt SelectOptions) ([]Iterator, []string, error) {
s, err := Prepare(stmt, shardMapper, opt)
if err != nil {
return nil, nil, err
}
return s.Select()
}
// Create an iterator creator based on the shards in the cluster.
shards, err := shardMapper.MapShards(stmt.Sources, timeRange, sopt)
type preparedStatement struct {
stmt *influxql.SelectStatement
opt IteratorOptions
ic IteratorCreator
columns []string
}
func (p *preparedStatement) Select() ([]Iterator, []string, error) {
itrs, err := buildIterators(p.stmt, p.ic, p.opt)
if err != nil {
return nil, nil, err
}
defer shards.Close()
// Rewrite wildcards, if any exist.
tmp, err := stmt.RewriteFields(shards)
if err != nil {
return nil, nil, err
}
stmt = tmp
// Determine base options for iterators.
opt, err := newIteratorOptionsStmt(stmt, &sopt)
if err != nil {
return nil, nil, err
}
if sopt.MaxBucketsN > 0 && !stmt.IsRawQuery {
interval, err := stmt.GroupByInterval()
if err != nil {
return nil, nil, err
}
if interval > 0 {
// Determine the start and end time matched to the interval (may not match the actual times).
first, _ := opt.Window(opt.StartTime)
last, _ := opt.Window(opt.EndTime - 1)
// Determine the number of buckets by finding the time span and dividing by the interval.
buckets := (last - first + int64(interval)) / int64(interval)
if int(buckets) > sopt.MaxBucketsN {
return nil, nil, fmt.Errorf("max-select-buckets limit exceeded: (%d/%d)", buckets, sopt.MaxBucketsN)
}
}
}
itrs, err := buildIterators(stmt, shards, opt)
if err != nil {
return nil, nil, err
}
columns := stmt.ColumnNames()
return itrs, columns, nil
return itrs, p.columns, nil
}
func buildIterators(stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) ([]Iterator, error) {