Merge pull request #9525 from influxdata/js-execution-context

Turn the ExecutionContext into a context.Context
pull/9536/head
Jonathan A. Sternberg 2018-03-08 14:26:19 -06:00 committed by GitHub
commit 46a5a66136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 376 additions and 223 deletions

View File

@ -56,10 +56,10 @@ type StatementExecutor struct {
} }
// ExecuteStatement executes the given statement with the given execution context. // ExecuteStatement executes the given statement with the given execution context.
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
// Select statements are handled separately so that they can be streamed. // Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok { if stmt, ok := stmt.(*influxql.SelectStatement); ok {
return e.executeSelectStatement(context.Background(), stmt, &ctx) return e.executeSelectStatement(stmt, ctx)
} }
var rows models.Rows var rows models.Rows
@ -140,9 +140,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
err = e.executeDropUserStatement(stmt) err = e.executeDropUserStatement(stmt)
case *influxql.ExplainStatement: case *influxql.ExplainStatement:
if stmt.Analyze { if stmt.Analyze {
rows, err = e.executeExplainAnalyzeStatement(stmt, &ctx) rows, err = e.executeExplainAnalyzeStatement(stmt, ctx)
} else { } else {
rows, err = e.executeExplainStatement(stmt, &ctx) rows, err = e.executeExplainStatement(stmt, ctx)
} }
case *influxql.GrantStatement: case *influxql.GrantStatement:
if ctx.ReadOnly { if ctx.ReadOnly {
@ -167,13 +167,13 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
case *influxql.ShowContinuousQueriesStatement: case *influxql.ShowContinuousQueriesStatement:
rows, err = e.executeShowContinuousQueriesStatement(stmt) rows, err = e.executeShowContinuousQueriesStatement(stmt)
case *influxql.ShowDatabasesStatement: case *influxql.ShowDatabasesStatement:
rows, err = e.executeShowDatabasesStatement(stmt, &ctx) rows, err = e.executeShowDatabasesStatement(stmt, ctx)
case *influxql.ShowDiagnosticsStatement: case *influxql.ShowDiagnosticsStatement:
rows, err = e.executeShowDiagnosticsStatement(stmt) rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowGrantsForUserStatement: case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt) rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowMeasurementsStatement: case *influxql.ShowMeasurementsStatement:
return e.executeShowMeasurementsStatement(stmt, &ctx) return e.executeShowMeasurementsStatement(stmt, ctx)
case *influxql.ShowMeasurementCardinalityStatement: case *influxql.ShowMeasurementCardinalityStatement:
rows, err = e.executeShowMeasurementCardinalityStatement(stmt) rows, err = e.executeShowMeasurementCardinalityStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement: case *influxql.ShowRetentionPoliciesStatement:
@ -189,9 +189,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
case *influxql.ShowSubscriptionsStatement: case *influxql.ShowSubscriptionsStatement:
rows, err = e.executeShowSubscriptionsStatement(stmt) rows, err = e.executeShowSubscriptionsStatement(stmt)
case *influxql.ShowTagKeysStatement: case *influxql.ShowTagKeysStatement:
return e.executeShowTagKeys(stmt, &ctx) return e.executeShowTagKeys(stmt, ctx)
case *influxql.ShowTagValuesStatement: case *influxql.ShowTagValuesStatement:
return e.executeShowTagValues(stmt, &ctx) return e.executeShowTagValues(stmt, ctx)
case *influxql.ShowUsersStatement: case *influxql.ShowUsersStatement:
rows, err = e.executeShowUsersStatement(stmt) rows, err = e.executeShowUsersStatement(stmt)
case *influxql.SetPasswordUserStatement: case *influxql.SetPasswordUserStatement:
@ -211,9 +211,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
} }
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{
StatementID: ctx.StatementID, Series: rows,
Series: rows, Messages: messages,
Messages: messages,
}) })
} }
@ -407,13 +406,12 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme
return e.MetaClient.DropUser(q.Name) return e.MetaClient.DropUser(q.Name)
} }
func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) { func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
opt := query.SelectOptions{ opt := query.SelectOptions{
InterruptCh: ectx.InterruptCh, NodeID: ctx.ExecutionOptions.NodeID,
NodeID: ectx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN, MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN, MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ectx.Authorizer, Authorizer: ctx.Authorizer,
} }
// Prepare the query for execution, but do not actually execute it. // Prepare the query for execution, but do not actually execute it.
@ -442,13 +440,13 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement
func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) { func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
stmt := q.Statement stmt := q.Statement
t, span := tracing.NewTrace("select") t, span := tracing.NewTrace("select")
ctx := tracing.NewContextWithTrace(context.Background(), t) ctx := tracing.NewContextWithTrace(ectx, t)
ctx = tracing.NewContextWithSpan(ctx, span) ctx = tracing.NewContextWithSpan(ctx, span)
var aux query.Iterators var aux query.Iterators
ctx = query.NewContextWithIterators(ctx, &aux) ctx = query.NewContextWithIterators(ctx, &aux)
start := time.Now() start := time.Now()
itrs, columns, err := e.createIterators(ctx, stmt, ectx) itrs, columns, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -474,8 +472,8 @@ func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainSt
} else if row == nil { } else if row == nil {
// Check if the query was interrupted while emitting. // Check if the query was interrupted while emitting.
select { select {
case <-ectx.InterruptCh: case <-ectx.Done():
err = query.ErrQueryInterrupted err = ectx.Err()
goto CLEANUP goto CLEANUP
default: default:
} }
@ -544,14 +542,14 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw
return e.MetaClient.UpdateUser(q.Name, q.Password) return e.MetaClient.UpdateUser(q.Name, q.Password)
} }
func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) error { func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {
itrs, columns, err := e.createIterators(ctx, stmt, ectx) itrs, columns, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions)
if err != nil { if err != nil {
return err return err
} }
// Generate a row emitter from the iterator set. // Generate a row emitter from the iterator set.
em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize) em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
em.Columns = columns em.Columns = columns
if stmt.Location != nil { if stmt.Location != nil {
em.Location = stmt.Location em.Location = stmt.Location
@ -576,8 +574,8 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
} else if row == nil { } else if row == nil {
// Check if the query was interrupted while emitting. // Check if the query was interrupted while emitting.
select { select {
case <-ectx.InterruptCh: case <-ctx.Done():
return query.ErrQueryInterrupted return ctx.Err()
default: default:
} }
break break
@ -593,13 +591,12 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
} }
result := &query.Result{ result := &query.Result{
StatementID: ectx.StatementID, Series: []*models.Row{row},
Series: []*models.Row{row}, Partial: partial,
Partial: partial,
} }
// Send results or exit if closing. // Send results or exit if closing.
if err := ectx.Send(result); err != nil { if err := ctx.Send(result); err != nil {
return err return err
} }
@ -613,13 +610,12 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
} }
var messages []*query.Message var messages []*query.Message
if ectx.ReadOnly { if ctx.ReadOnly {
messages = append(messages, query.ReadOnlyWarning(stmt.String())) messages = append(messages, query.ReadOnlyWarning(stmt.String()))
} }
return ectx.Send(&query.Result{ return ctx.Send(&query.Result{
StatementID: ectx.StatementID, Messages: messages,
Messages: messages,
Series: []*models.Row{{ Series: []*models.Row{{
Name: "result", Name: "result",
Columns: []string{"time", "written"}, Columns: []string{"time", "written"},
@ -630,34 +626,28 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
// Always emit at least one result. // Always emit at least one result.
if !emitted { if !emitted {
return ectx.Send(&query.Result{ return ctx.Send(&query.Result{
StatementID: ectx.StatementID, Series: make([]*models.Row, 0),
Series: make([]*models.Row, 0),
}) })
} }
return nil return nil
} }
func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) ([]query.Iterator, []string, error) { func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) ([]query.Iterator, []string, error) {
opt := query.SelectOptions{ sopt := query.SelectOptions{
InterruptCh: ectx.InterruptCh, NodeID: opt.NodeID,
NodeID: ectx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN, MaxSeriesN: e.MaxSelectSeriesN,
MaxPointN: e.MaxSelectPointN,
MaxBucketsN: e.MaxSelectBucketsN, MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ectx.Authorizer, Authorizer: opt.Authorizer,
} }
// Create a set of iterators from a selection. // Create a set of iterators from a selection.
itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, opt) itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, sopt)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if e.MaxSelectPointN > 0 {
monitor := query.PointLimitMonitor(itrs, query.DefaultStatsInterval, e.MaxSelectPointN)
ectx.Query.Monitor(monitor)
}
return itrs, columns, nil return itrs, columns, nil
} }
@ -738,8 +728,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition) names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition)
if err != nil || len(names) == 0 { if err != nil || len(names) == 0 {
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{
StatementID: ctx.StatementID, Err: err,
Err: err,
}) })
} }
@ -763,13 +752,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
} }
if len(values) == 0 { if len(values) == 0 {
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{})
StatementID: ctx.StatementID,
})
} }
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{{ Series: []*models.Row{{
Name: "measurements", Name: "measurements",
Columns: []string{"name"}, Columns: []string{"name"},
@ -975,8 +961,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement,
tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond) tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond)
if err != nil { if err != nil {
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{
StatementID: ctx.StatementID, Err: err,
Err: err,
}) })
} }
@ -1009,8 +994,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement,
} }
if err := ctx.Send(&query.Result{ if err := ctx.Send(&query.Result{
StatementID: ctx.StatementID, Series: []*models.Row{row},
Series: []*models.Row{row},
}); err != nil { }); err != nil {
return err return err
} }
@ -1019,9 +1003,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement,
// Ensure at least one result is emitted. // Ensure at least one result is emitted.
if !emitted { if !emitted {
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{})
StatementID: ctx.StatementID,
})
} }
return nil return nil
} }
@ -1065,10 +1047,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond) tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond)
if err != nil { if err != nil {
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{Err: err})
StatementID: ctx.StatementID,
Err: err,
})
} }
emitted := false emitted := false
@ -1103,8 +1082,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
} }
if err := ctx.Send(&query.Result{ if err := ctx.Send(&query.Result{
StatementID: ctx.StatementID, Series: []*models.Row{row},
Series: []*models.Row{row},
}); err != nil { }); err != nil {
return err return err
} }
@ -1113,9 +1091,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
// Ensure at least one result is emitted. // Ensure at least one result is emitted.
if !emitted { if !emitted {
return ctx.Send(&query.Result{ return ctx.Send(&query.Result{})
StatementID: ctx.StatementID,
})
} }
return nil return nil
} }

View File

@ -874,10 +874,11 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions)
columns := stmt.ColumnNames() columns := stmt.ColumnNames()
return &preparedStatement{ return &preparedStatement{
stmt: stmt, stmt: stmt,
opt: opt, opt: opt,
ic: shards, ic: shards,
columns: columns, columns: columns,
now: c.Options.Now, maxPointN: sopt.MaxPointN,
now: c.Options.Now,
}, nil }, nil
} }

113
query/execution_context.go Normal file
View File

@ -0,0 +1,113 @@
package query
import (
"context"
"sync"
)
// ExecutionContext contains state that the query is currently executing with.
type ExecutionContext struct {
context.Context
// The statement ID of the executing query.
statementID int
// The query ID of the executing query.
QueryID uint64
// The query task information available to the StatementExecutor.
task *Task
// Output channel where results and errors should be sent.
Results chan *Result
// Options used to start this query.
ExecutionOptions
mu sync.RWMutex
done chan struct{}
err error
}
func (ctx *ExecutionContext) watch() {
ctx.done = make(chan struct{})
if ctx.err != nil {
close(ctx.done)
return
}
go func() {
defer close(ctx.done)
var taskCtx <-chan struct{}
if ctx.task != nil {
taskCtx = ctx.task.closing
}
select {
case <-taskCtx:
ctx.err = ctx.task.Error()
if ctx.err == nil {
ctx.err = ErrQueryInterrupted
}
case <-ctx.AbortCh:
ctx.err = ErrQueryAborted
case <-ctx.Context.Done():
ctx.err = ctx.Context.Err()
}
}()
}
func (ctx *ExecutionContext) Done() <-chan struct{} {
ctx.mu.RLock()
if ctx.done != nil {
defer ctx.mu.RUnlock()
return ctx.done
}
ctx.mu.RUnlock()
ctx.mu.Lock()
defer ctx.mu.Unlock()
if ctx.done == nil {
ctx.watch()
}
return ctx.done
}
func (ctx *ExecutionContext) Err() error {
ctx.mu.RLock()
defer ctx.mu.RUnlock()
return ctx.err
}
func (ctx *ExecutionContext) Value(key interface{}) interface{} {
switch key {
case monitorContextKey:
return ctx.task
}
return ctx.Context.Value(key)
}
// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
result.StatementID = ctx.statementID
select {
case <-ctx.AbortCh:
return ErrQueryAborted
case ctx.Results <- result:
}
return nil
}
// Send sends a Result to the Results channel and will exit if the query has
// been interrupted or aborted.
func (ctx *ExecutionContext) Send(result *Result) error {
result.StatementID = ctx.statementID
select {
case <-ctx.Done():
return ctx.Err()
case ctx.Results <- result:
}
return nil
}

View File

@ -139,55 +139,11 @@ type ExecutionOptions struct {
AbortCh <-chan struct{} AbortCh <-chan struct{}
} }
// ExecutionContext contains state that the query is currently executing with.
type ExecutionContext struct {
// The statement ID of the executing query.
StatementID int
// The query ID of the executing query.
QueryID uint64
// The query task information available to the StatementExecutor.
Query *Task
// Output channel where results and errors should be sent.
Results chan *Result
// A channel that is closed when the query is interrupted.
InterruptCh <-chan struct{}
// Options used to start this query.
ExecutionOptions
}
// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
select {
case <-ctx.AbortCh:
return ErrQueryAborted
case ctx.Results <- result:
}
return nil
}
// Send sends a Result to the Results channel and will exit if the query has
// been interrupted or aborted.
func (ctx *ExecutionContext) Send(result *Result) error {
select {
case <-ctx.InterruptCh:
return ErrQueryInterrupted
case <-ctx.AbortCh:
return ErrQueryAborted
case ctx.Results <- result:
}
return nil
}
type contextKey int type contextKey int
const ( const (
iteratorsContextKey contextKey = iota iteratorsContextKey contextKey = iota
monitorContextKey
) )
// NewContextWithIterators returns a new context.Context with the *Iterators slice added. // NewContextWithIterators returns a new context.Context with the *Iterators slice added.
@ -208,7 +164,7 @@ func tryAddAuxIteratorToContext(ctx context.Context, itr AuxIterator) {
type StatementExecutor interface { type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the // ExecuteStatement executes a statement. Results should be sent to the
// results channel in the ExecutionContext. // results channel in the ExecutionContext.
ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error
} }
// StatementNormalizer normalizes a statement before it is executed. // StatementNormalizer normalizes a statement before it is executed.
@ -298,7 +254,7 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo
atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds()) atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now()) }(time.Now())
qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) ctx, detach, err := e.TaskManager.AttachQuery(query, opt, closing)
if err != nil { if err != nil {
select { select {
case results <- &Result{Err: err}: case results <- &Result{Err: err}:
@ -306,21 +262,15 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo
} }
return return
} }
defer e.TaskManager.DetachQuery(qid) defer detach()
// Setup the execution context that will be used when executing statements. // Setup the execution context that will be used when executing statements.
ctx := ExecutionContext{ ctx.Results = results
QueryID: qid,
Query: task,
Results: results,
InterruptCh: task.closing,
ExecutionOptions: opt,
}
var i int var i int
LOOP: LOOP:
for ; i < len(query.Statements); i++ { for ; i < len(query.Statements); i++ {
ctx.StatementID = i ctx.statementID = i
stmt := query.Statements[i] stmt := query.Statements[i]
// If a default database wasn't passed in by the caller, check the statement. // If a default database wasn't passed in by the caller, check the statement.
@ -390,7 +340,7 @@ LOOP:
if err == ErrQueryInterrupted { if err == ErrQueryInterrupted {
// Query was interrupted so retrieve the real interrupt error from // Query was interrupted so retrieve the real interrupt error from
// the query task if there is one. // the query task if there is one.
if qerr := task.Error(); qerr != nil { if qerr := ctx.Err(); qerr != nil {
err = qerr err = qerr
} }
} }
@ -409,13 +359,11 @@ LOOP:
// Check if the query was interrupted during an uninterruptible statement. // Check if the query was interrupted during an uninterruptible statement.
interrupted := false interrupted := false
if ctx.InterruptCh != nil { select {
select { case <-ctx.Done():
case <-ctx.InterruptCh: interrupted = true
interrupted = true default:
default: // Query has not been interrupted.
// Query has not been interrupted.
}
} }
if interrupted { if interrupted {
@ -463,11 +411,6 @@ func (e *Executor) recover(query *influxql.Query, results chan *Result) {
} }
} }
// MonitorFunc is a function that will be called to check if a query
// is currently healthy. If the query needs to be interrupted for some reason,
// the error should be returned by this function.
type MonitorFunc func(<-chan struct{}) error
// Task is the internal data structure for managing queries. // Task is the internal data structure for managing queries.
// For the public use data structure that gets returned, see Task. // For the public use data structure that gets returned, see Task.
type Task struct { type Task struct {

View File

@ -14,10 +14,10 @@ import (
var errUnexpected = errors.New("unexpected error") var errUnexpected = errors.New("unexpected error")
type StatementExecutor struct { type StatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
} }
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx) return e.ExecuteStatementFn(stmt, ctx)
} }
@ -33,7 +33,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if ctx.QueryID != 1 { if ctx.QueryID != 1 {
t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID) t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID)
} }
@ -54,7 +54,7 @@ func TestQueryExecutor_KillQuery(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
switch stmt.(type) { switch stmt.(type) {
case *influxql.KillQueryStatement: case *influxql.KillQueryStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx) return e.TaskManager.ExecuteStatement(stmt, ctx)
@ -62,8 +62,8 @@ func TestQueryExecutor_KillQuery(t *testing.T) {
qid <- ctx.QueryID qid <- ctx.QueryID
select { select {
case <-ctx.InterruptCh: case <-ctx.Done():
return query.ErrQueryInterrupted return ctx.Err()
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds") t.Error("killing the query did not close the channel after 100 milliseconds")
return errUnexpected return errUnexpected
@ -95,7 +95,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
switch stmt.(type) { switch stmt.(type) {
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx) return e.TaskManager.ExecuteStatement(stmt, ctx)
@ -103,7 +103,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) {
qid <- ctx.QueryID qid <- ctx.QueryID
select { select {
case <-ctx.InterruptCh: case <-ctx.Done():
select { select {
case <-done: case <-done:
// Keep the query running until we run SHOW QUERIES. // Keep the query running until we run SHOW QUERIES.
@ -164,7 +164,7 @@ func TestQueryExecutor_KillQuery_CloseTaskManager(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
switch stmt.(type) { switch stmt.(type) {
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx) return e.TaskManager.ExecuteStatement(stmt, ctx)
@ -221,7 +221,7 @@ func TestQueryExecutor_KillQuery_AlreadyKilled(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
switch stmt.(type) { switch stmt.(type) {
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx) return e.TaskManager.ExecuteStatement(stmt, ctx)
@ -257,10 +257,10 @@ func TestQueryExecutor_Interrupt(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
select { select {
case <-ctx.InterruptCh: case <-ctx.Done():
return query.ErrQueryInterrupted return ctx.Err()
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds") t.Error("killing the query did not close the channel after 100 milliseconds")
return errUnexpected return errUnexpected
@ -288,7 +288,7 @@ func TestQueryExecutor_Abort(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
<-ch1 <-ch1
if err := ctx.Send(&query.Result{Err: errUnexpected}); err != query.ErrQueryAborted { if err := ctx.Send(&query.Result{Err: errUnexpected}); err != query.ErrQueryAborted {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -311,7 +311,7 @@ func TestQueryExecutor_Abort(t *testing.T) {
func TestQueryExecutor_ShowQueries(t *testing.T) { func TestQueryExecutor_ShowQueries(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
switch stmt.(type) { switch stmt.(type) {
case *influxql.ShowQueriesStatement: case *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx) return e.TaskManager.ExecuteStatement(stmt, ctx)
@ -347,10 +347,10 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
select { select {
case <-ctx.InterruptCh: case <-ctx.Done():
return query.ErrQueryInterrupted return ctx.Err()
case <-time.After(time.Second): case <-time.After(time.Second):
t.Errorf("timeout has not killed the query") t.Errorf("timeout has not killed the query")
return errUnexpected return errUnexpected
@ -376,10 +376,10 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
qid <- ctx.QueryID qid <- ctx.QueryID
<-ctx.InterruptCh <-ctx.Done()
return query.ErrQueryInterrupted return ctx.Err()
}, },
} }
e.TaskManager.MaxConcurrentQueries = 1 e.TaskManager.MaxConcurrentQueries = 1
@ -416,10 +416,10 @@ func TestQueryExecutor_Close(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
close(ch1) close(ch1)
<-ctx.InterruptCh <-ctx.Done()
return query.ErrQueryInterrupted return ctx.Err()
}, },
} }
@ -463,7 +463,7 @@ func TestQueryExecutor_Panic(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
panic("test error") panic("test error")
}, },
} }
@ -481,7 +481,7 @@ func TestQueryExecutor_Panic(t *testing.T) {
func TestQueryExecutor_InvalidSource(t *testing.T) { func TestQueryExecutor_InvalidSource(t *testing.T) {
e := NewQueryExecutor() e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{ e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return errors.New("statement executed unexpectedly") return errors.New("statement executed unexpectedly")
}, },
} }

View File

@ -791,7 +791,6 @@ func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions)
opt.Limit, opt.Offset = stmt.Limit, stmt.Offset opt.Limit, opt.Offset = stmt.Limit, stmt.Offset
opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset
opt.MaxSeriesN = sopt.MaxSeriesN opt.MaxSeriesN = sopt.MaxSeriesN
opt.InterruptCh = sopt.InterruptCh
opt.Authorizer = sopt.Authorizer opt.Authorizer = sopt.Authorizer
return opt, nil return opt, nil

View File

@ -1554,9 +1554,11 @@ func TestIterator_EncodeDecode(t *testing.T) {
// Test implementation of influxql.FloatIterator // Test implementation of influxql.FloatIterator
type FloatIterator struct { type FloatIterator struct {
Points []query.FloatPoint Context context.Context
Closed bool Points []query.FloatPoint
stats query.IteratorStats Closed bool
Delay time.Duration
stats query.IteratorStats
} }
func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats } func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats }
@ -1568,6 +1570,22 @@ func (itr *FloatIterator) Next() (*query.FloatPoint, error) {
return nil, nil return nil, nil
} }
// If we have asked for a delay, then delay the returning of the point
// until either an (optional) context is done or the time has passed.
if itr.Delay > 0 {
var done <-chan struct{}
if itr.Context != nil {
done = itr.Context.Done()
}
timer := time.NewTimer(itr.Delay)
select {
case <-timer.C:
case <-done:
timer.Stop()
return nil, itr.Context.Err()
}
}
v := &itr.Points[0] v := &itr.Points[0]
itr.Points = itr.Points[1:] itr.Points = itr.Points[1:]
return v, nil return v, nil

View File

@ -1,6 +1,31 @@
package query package query
import "time" import (
"context"
"time"
)
// MonitorFunc is a function that will be called to check if a query
// is currently healthy. If the query needs to be interrupted for some reason,
// the error should be returned by this function.
type MonitorFunc func(<-chan struct{}) error
// Monitor monitors the status of a query and returns whether the query should
// be aborted with an error.
type Monitor interface {
// Monitor starts a new goroutine that will monitor a query. The function
// will be passed in a channel to signal when the query has been finished
// normally. If the function returns with an error and the query is still
// running, the query will be terminated.
Monitor(fn MonitorFunc)
}
// MonitorFromContext returns a Monitor embedded within the Context
// if one exists.
func MonitorFromContext(ctx context.Context) Monitor {
v, _ := ctx.Value(monitorContextKey).(Monitor)
return v
}
// PointLimitMonitor is a query monitor that exits when the number of points // PointLimitMonitor is a query monitor that exits when the number of points
// emitted exceeds a threshold. // emitted exceeds a threshold.

57
query/monitor_test.go Normal file
View File

@ -0,0 +1,57 @@
package query_test
import (
"context"
"testing"
"time"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
)
func TestPointLimitMonitor(t *testing.T) {
t.Parallel()
stmt := MustParseSelectStatement(`SELECT mean(value) FROM cpu`)
// Create a new task manager so we can use the query task as a monitor.
taskManager := query.NewTaskManager()
ctx, detach, err := taskManager.AttachQuery(&influxql.Query{
Statements: []influxql.Statement{stmt},
}, query.ExecutionOptions{}, nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer detach()
shardMapper := ShardMapper{
MapShardsFn: func(sources influxql.Sources, t influxql.TimeRange) query.ShardGroup {
return &ShardGroup{
CreateIteratorFn: func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
return &FloatIterator{
Points: []query.FloatPoint{
{Name: "cpu", Value: 35},
},
Context: ctx,
Delay: 2 * time.Second,
stats: query.IteratorStats{
PointN: 10,
},
}, nil
},
Fields: map[string]influxql.DataType{
"value": influxql.Float,
},
}
},
}
itrs, _, err := query.Select(ctx, stmt, &shardMapper, query.SelectOptions{
MaxPointN: 1,
})
if _, err := Iterators(itrs).ReadAll(); err == nil {
t.Fatalf("expected an error")
} else if got, want := err.Error(), "max-select-point limit exceeed: (10/1)"; got != want {
t.Fatalf("unexpected error: got=%v want=%v", got, want)
}
}

View File

@ -22,13 +22,14 @@ type SelectOptions struct {
// If zero, all nodes are used. // If zero, all nodes are used.
NodeID uint64 NodeID uint64
// An optional channel that, if closed, signals that the select should be
// interrupted.
InterruptCh <-chan struct{}
// Maximum number of concurrent series. // Maximum number of concurrent series.
MaxSeriesN int MaxSeriesN int
// Maximum number of points to read from the query.
// This requires the passed in context to have a Monitor that is
// created using WithMonitor.
MaxPointN int
// Maximum number of buckets for a statement. // Maximum number of buckets for a statement.
MaxBucketsN int MaxBucketsN int
} }
@ -97,8 +98,9 @@ type preparedStatement struct {
IteratorCreator IteratorCreator
io.Closer io.Closer
} }
columns []string columns []string
now time.Time maxPointN int
now time.Time
} }
func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, error) { func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, error) {
@ -106,10 +108,22 @@ func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, e
// Each level of the query should use a time range discovered during // Each level of the query should use a time range discovered during
// compilation, but that requires too large of a refactor at the moment. // compilation, but that requires too large of a refactor at the moment.
ctx = context.WithValue(ctx, "now", p.now) ctx = context.WithValue(ctx, "now", p.now)
itrs, err := buildIterators(ctx, p.stmt, p.ic, p.opt)
opt := p.opt
opt.InterruptCh = ctx.Done()
itrs, err := buildIterators(ctx, p.stmt, p.ic, opt)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
// If a monitor exists and we are told there is a maximum number of points,
// register the monitor function.
if m := MonitorFromContext(ctx); m != nil {
if p.maxPointN > 0 {
monitor := PointLimitMonitor(itrs, DefaultStatsInterval, p.maxPointN)
m.Monitor(monitor)
}
}
return itrs, p.columns, nil return itrs, p.columns, nil
} }

View File

@ -1,6 +1,7 @@
package query package query
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -71,7 +72,7 @@ func NewTaskManager() *TaskManager {
} }
// ExecuteStatement executes a statement containing one of the task management queries. // ExecuteStatement executes a statement containing one of the task management queries.
func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error { func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error {
switch stmt := stmt.(type) { switch stmt := stmt.(type) {
case *influxql.ShowQueriesStatement: case *influxql.ShowQueriesStatement:
rows, err := t.executeShowQueriesStatement(stmt) rows, err := t.executeShowQueriesStatement(stmt)
@ -79,10 +80,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon
return err return err
} }
ctx.Results <- &Result{ ctx.Send(&Result{
StatementID: ctx.StatementID, Series: rows,
Series: rows, })
}
case *influxql.KillQueryStatement: case *influxql.KillQueryStatement:
var messages []*Message var messages []*Message
if ctx.ReadOnly { if ctx.ReadOnly {
@ -92,10 +92,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon
if err := t.executeKillQueryStatement(stmt); err != nil { if err := t.executeKillQueryStatement(stmt); err != nil {
return err return err
} }
ctx.Results <- &Result{ ctx.Send(&Result{
StatementID: ctx.StatementID, Messages: messages,
Messages: messages, })
}
default: default:
return ErrInvalidQuery return ErrInvalidQuery
} }
@ -150,22 +149,22 @@ func (t *TaskManager) queryError(qid uint64, err error) {
// query finishes running. // query finishes running.
// //
// After a query finishes running, the system is free to reuse a query id. // After a query finishes running, the system is free to reuse a query id.
func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt <-chan struct{}) (uint64, *Task, error) { func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}) (*ExecutionContext, func(), error) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.shutdown { if t.shutdown {
return 0, nil, ErrQueryEngineShutdown return nil, nil, ErrQueryEngineShutdown
} }
if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries { if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries {
return 0, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries) return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries)
} }
qid := t.nextID qid := t.nextID
query := &Task{ query := &Task{
query: q.String(), query: q.String(),
database: database, database: opt.Database,
status: RunningTask, status: RunningTask,
startTime: time.Now(), startTime: time.Now(),
closing: make(chan struct{}), closing: make(chan struct{}),
@ -189,7 +188,15 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt
}) })
} }
t.nextID++ t.nextID++
return qid, query, nil
ctx := &ExecutionContext{
Context: context.Background(),
QueryID: qid,
task: query,
ExecutionOptions: opt,
}
ctx.watch()
return ctx, func() { t.DetachQuery(qid) }, nil
} }
// KillQuery enters a query into the killed state and closes the channel // KillQuery enters a query into the killed state and closes the channel

View File

@ -50,7 +50,7 @@ func TestContinuousQueryService_Run(t *testing.T) {
// Set a callback for ExecuteStatement. // Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
callCnt++ callCnt++
if callCnt >= expectCallCnt { if callCnt >= expectCallCnt {
done <- struct{}{} done <- struct{}{}
@ -122,7 +122,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) {
// Set a callback for ExecuteStatement. // Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
s := stmt.(*influxql.SelectStatement) s := stmt.(*influxql.SelectStatement)
valuer := &influxql.NowValuer{Location: s.Location} valuer := &influxql.NowValuer{Location: s.Location}
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
@ -204,7 +204,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) {
// Set a callback for ExecuteQuery. // Set a callback for ExecuteQuery.
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
s := stmt.(*influxql.SelectStatement) s := stmt.(*influxql.SelectStatement)
valuer := &influxql.NowValuer{Location: s.Location} valuer := &influxql.NowValuer{Location: s.Location}
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
@ -274,7 +274,7 @@ func TestContinuousQueryService_GroupByOffset(t *testing.T) {
// Set a callback for ExecuteStatement. // Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
s := stmt.(*influxql.SelectStatement) s := stmt.(*influxql.SelectStatement)
valuer := &influxql.NowValuer{Location: s.Location} valuer := &influxql.NowValuer{Location: s.Location}
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
@ -315,7 +315,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
// Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader. // Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader.
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
done <- struct{}{} done <- struct{}{}
ctx.Results <- &query.Result{Err: errUnexpected} ctx.Results <- &query.Result{Err: errUnexpected}
return nil return nil
@ -336,7 +336,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) {
func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return errUnexpected return errUnexpected
}, },
} }
@ -435,7 +435,7 @@ func TestExecuteContinuousQuery_TimeRange(t *testing.T) {
// Set a callback for ExecuteStatement. // Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
s := stmt.(*influxql.SelectStatement) s := stmt.(*influxql.SelectStatement)
valuer := &influxql.NowValuer{Location: s.Location} valuer := &influxql.NowValuer{Location: s.Location}
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
@ -549,7 +549,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) {
// Set a callback for ExecuteStatement. // Set a callback for ExecuteStatement.
tests := make(chan test, 1) tests := make(chan test, 1)
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
test := <-tests test := <-tests
s := stmt.(*influxql.SelectStatement) s := stmt.(*influxql.SelectStatement)
valuer := &influxql.NowValuer{Location: s.Location} valuer := &influxql.NowValuer{Location: s.Location}
@ -592,7 +592,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) {
func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return errExpected return errExpected
}, },
} }
@ -612,7 +612,7 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) {
const writeN = int64(50) const writeN = int64(50)
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
ctx.Results <- &query.Result{ ctx.Results <- &query.Result{
Series: []*models.Row{{ Series: []*models.Row{{
Name: "result", Name: "result",
@ -658,8 +658,8 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) {
func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) { func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
s.QueryExecutor.StatementExecutor = &StatementExecutor{ s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
ctx.Results <- &query.Result{} ctx.Send(&query.Result{})
return nil return nil
}, },
} }
@ -821,10 +821,10 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error
// StatementExecutor is a mock statement executor. // StatementExecutor is a mock statement executor.
type StatementExecutor struct { type StatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
} }
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx) return e.ExecuteStatementFn(stmt, ctx)
} }

View File

@ -31,7 +31,7 @@ import (
// Ensure the handler returns results from a query (including nil results). // Ensure the handler returns results from a query (including nil results).
func TestHandler_Query(t *testing.T) { func TestHandler_Query(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if stmt.String() != `SELECT * FROM bar` { if stmt.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", stmt.String()) t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` { } else if ctx.Database != `foo` {
@ -54,7 +54,7 @@ func TestHandler_Query(t *testing.T) {
// Ensure the handler returns results from a query passed as a file. // Ensure the handler returns results from a query passed as a file.
func TestHandler_Query_File(t *testing.T) { func TestHandler_Query_File(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if stmt.String() != `SELECT * FROM bar` { if stmt.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", stmt.String()) t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` { } else if ctx.Database != `foo` {
@ -123,7 +123,7 @@ func TestHandler_Query_Auth(t *testing.T) {
} }
// Set mock statement executor for handler to use. // Set mock statement executor for handler to use.
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if stmt.String() != `SELECT * FROM bar` { if stmt.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", stmt.String()) t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` { } else if ctx.Database != `foo` {
@ -238,7 +238,7 @@ func TestHandler_Query_Auth(t *testing.T) {
// Ensure the handler returns results from a query (including nil results). // Ensure the handler returns results from a query (including nil results).
func TestHandler_QueryRegex(t *testing.T) { func TestHandler_QueryRegex(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` { if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` {
t.Fatalf("unexpected query: %s", stmt.String()) t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `test` { } else if ctx.Database != `test` {
@ -255,7 +255,7 @@ func TestHandler_QueryRegex(t *testing.T) {
// Ensure the handler merges results from the same statement. // Ensure the handler merges results from the same statement.
func TestHandler_Query_MergeResults(t *testing.T) { func TestHandler_Query_MergeResults(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil return nil
@ -273,7 +273,7 @@ func TestHandler_Query_MergeResults(t *testing.T) {
// Ensure the handler merges results from the same statement. // Ensure the handler merges results from the same statement.
func TestHandler_Query_MergeEmptyResults(t *testing.T) { func TestHandler_Query_MergeEmptyResults(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}}
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil return nil
@ -291,7 +291,7 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) {
// Ensure the handler can parse chunked and chunk size query parameters. // Ensure the handler can parse chunked and chunk size query parameters.
func TestHandler_Query_Chunked(t *testing.T) { func TestHandler_Query_Chunked(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if ctx.ChunkSize != 2 { if ctx.ChunkSize != 2 {
t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize)
} }
@ -315,7 +315,7 @@ func TestHandler_Query_Chunked(t *testing.T) {
func TestHandler_Query_Async(t *testing.T) { func TestHandler_Query_Async(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if stmt.String() != `SELECT * FROM bar` { if stmt.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", stmt.String()) t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` { } else if ctx.Database != `foo` {
@ -449,7 +449,7 @@ func TestHandler_Query_ErrAuthorize(t *testing.T) {
// Ensure the handler returns a status 200 if an error is returned in the result. // Ensure the handler returns a status 200 if an error is returned in the result.
func TestHandler_Query_ErrResult(t *testing.T) { func TestHandler_Query_ErrResult(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return errors.New("measurement not found") return errors.New("measurement not found")
} }
@ -470,9 +470,9 @@ func TestHandler_Query_CloseNotify(t *testing.T) {
interrupted := make(chan struct{}) interrupted := make(chan struct{})
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
select { select {
case <-ctx.InterruptCh: case <-ctx.Done():
case <-done: case <-done:
} }
close(interrupted) close(interrupted)
@ -614,7 +614,7 @@ func TestHandler_PromRead(t *testing.T) {
b := bytes.NewReader(compressed) b := bytes.NewReader(compressed)
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ /c/ AND neqregex !~ /d/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` { if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ /c/ AND neqregex !~ /d/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` {
t.Fatalf("unexpected query: %s", stmt.String()) t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` { } else if ctx.Database != `foo` {
@ -678,7 +678,7 @@ func TestHandler_Ping(t *testing.T) {
// Ensure the handler returns the version correctly from the different endpoints. // Ensure the handler returns the version correctly from the different endpoints.
func TestHandler_Version(t *testing.T) { func TestHandler_Version(t *testing.T) {
h := NewHandler(false) h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return nil return nil
} }
tests := []struct { tests := []struct {
@ -933,10 +933,10 @@ func NewHandler(requireAuthentication bool) *Handler {
// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. // HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
type HandlerStatementExecutor struct { type HandlerStatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
} }
func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx) return e.ExecuteStatementFn(stmt, ctx)
} }