diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 61ee8e7179..07284fd1fb 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -56,10 +56,10 @@ type StatementExecutor struct { } // ExecuteStatement executes the given statement with the given execution context. -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { // Select statements are handled separately so that they can be streamed. if stmt, ok := stmt.(*influxql.SelectStatement); ok { - return e.executeSelectStatement(context.Background(), stmt, &ctx) + return e.executeSelectStatement(stmt, ctx) } var rows models.Rows @@ -140,9 +140,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. err = e.executeDropUserStatement(stmt) case *influxql.ExplainStatement: if stmt.Analyze { - rows, err = e.executeExplainAnalyzeStatement(stmt, &ctx) + rows, err = e.executeExplainAnalyzeStatement(stmt, ctx) } else { - rows, err = e.executeExplainStatement(stmt, &ctx) + rows, err = e.executeExplainStatement(stmt, ctx) } case *influxql.GrantStatement: if ctx.ReadOnly { @@ -167,13 +167,13 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. case *influxql.ShowContinuousQueriesStatement: rows, err = e.executeShowContinuousQueriesStatement(stmt) case *influxql.ShowDatabasesStatement: - rows, err = e.executeShowDatabasesStatement(stmt, &ctx) + rows, err = e.executeShowDatabasesStatement(stmt, ctx) case *influxql.ShowDiagnosticsStatement: rows, err = e.executeShowDiagnosticsStatement(stmt) case *influxql.ShowGrantsForUserStatement: rows, err = e.executeShowGrantsForUserStatement(stmt) case *influxql.ShowMeasurementsStatement: - return e.executeShowMeasurementsStatement(stmt, &ctx) + return e.executeShowMeasurementsStatement(stmt, ctx) case *influxql.ShowMeasurementCardinalityStatement: rows, err = e.executeShowMeasurementCardinalityStatement(stmt) case *influxql.ShowRetentionPoliciesStatement: @@ -189,9 +189,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. case *influxql.ShowSubscriptionsStatement: rows, err = e.executeShowSubscriptionsStatement(stmt) case *influxql.ShowTagKeysStatement: - return e.executeShowTagKeys(stmt, &ctx) + return e.executeShowTagKeys(stmt, ctx) case *influxql.ShowTagValuesStatement: - return e.executeShowTagValues(stmt, &ctx) + return e.executeShowTagValues(stmt, ctx) case *influxql.ShowUsersStatement: rows, err = e.executeShowUsersStatement(stmt) case *influxql.SetPasswordUserStatement: @@ -211,9 +211,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. } return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: rows, - Messages: messages, + Series: rows, + Messages: messages, }) } @@ -407,13 +406,12 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme return e.MetaClient.DropUser(q.Name) } -func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) { +func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) { opt := query.SelectOptions{ - InterruptCh: ectx.InterruptCh, - NodeID: ectx.ExecutionOptions.NodeID, + NodeID: ctx.ExecutionOptions.NodeID, MaxSeriesN: e.MaxSelectSeriesN, MaxBucketsN: e.MaxSelectBucketsN, - Authorizer: ectx.Authorizer, + Authorizer: ctx.Authorizer, } // Prepare the query for execution, but do not actually execute it. @@ -442,13 +440,13 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) { stmt := q.Statement t, span := tracing.NewTrace("select") - ctx := tracing.NewContextWithTrace(context.Background(), t) + ctx := tracing.NewContextWithTrace(ectx, t) ctx = tracing.NewContextWithSpan(ctx, span) var aux query.Iterators ctx = query.NewContextWithIterators(ctx, &aux) start := time.Now() - itrs, columns, err := e.createIterators(ctx, stmt, ectx) + itrs, columns, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions) if err != nil { return nil, err } @@ -474,8 +472,8 @@ func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainSt } else if row == nil { // Check if the query was interrupted while emitting. select { - case <-ectx.InterruptCh: - err = query.ErrQueryInterrupted + case <-ectx.Done(): + err = ectx.Err() goto CLEANUP default: } @@ -544,14 +542,14 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw return e.MetaClient.UpdateUser(q.Name, q.Password) } -func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) error { - itrs, columns, err := e.createIterators(ctx, stmt, ectx) +func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error { + itrs, columns, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions) if err != nil { return err } // Generate a row emitter from the iterator set. - em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize) + em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize) em.Columns = columns if stmt.Location != nil { em.Location = stmt.Location @@ -576,8 +574,8 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in } else if row == nil { // Check if the query was interrupted while emitting. select { - case <-ectx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() default: } break @@ -593,13 +591,12 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in } result := &query.Result{ - StatementID: ectx.StatementID, - Series: []*models.Row{row}, - Partial: partial, + Series: []*models.Row{row}, + Partial: partial, } // Send results or exit if closing. - if err := ectx.Send(result); err != nil { + if err := ctx.Send(result); err != nil { return err } @@ -613,13 +610,12 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in } var messages []*query.Message - if ectx.ReadOnly { + if ctx.ReadOnly { messages = append(messages, query.ReadOnlyWarning(stmt.String())) } - return ectx.Send(&query.Result{ - StatementID: ectx.StatementID, - Messages: messages, + return ctx.Send(&query.Result{ + Messages: messages, Series: []*models.Row{{ Name: "result", Columns: []string{"time", "written"}, @@ -630,34 +626,28 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in // Always emit at least one result. if !emitted { - return ectx.Send(&query.Result{ - StatementID: ectx.StatementID, - Series: make([]*models.Row, 0), + return ctx.Send(&query.Result{ + Series: make([]*models.Row, 0), }) } return nil } -func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) ([]query.Iterator, []string, error) { - opt := query.SelectOptions{ - InterruptCh: ectx.InterruptCh, - NodeID: ectx.ExecutionOptions.NodeID, +func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) ([]query.Iterator, []string, error) { + sopt := query.SelectOptions{ + NodeID: opt.NodeID, MaxSeriesN: e.MaxSelectSeriesN, + MaxPointN: e.MaxSelectPointN, MaxBucketsN: e.MaxSelectBucketsN, - Authorizer: ectx.Authorizer, + Authorizer: opt.Authorizer, } // Create a set of iterators from a selection. - itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, opt) + itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, sopt) if err != nil { return nil, nil, err } - - if e.MaxSelectPointN > 0 { - monitor := query.PointLimitMonitor(itrs, query.DefaultStatsInterval, e.MaxSelectPointN) - ectx.Query.Monitor(monitor) - } return itrs, columns, nil } @@ -738,8 +728,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition) if err != nil || len(names) == 0 { return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, + Err: err, }) } @@ -763,13 +752,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } if len(values) == 0 { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + return ctx.Send(&query.Result{}) } return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, Series: []*models.Row{{ Name: "measurements", Columns: []string{"name"}, @@ -975,8 +961,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond) if err != nil { return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, + Err: err, }) } @@ -1009,8 +994,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, } if err := ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, + Series: []*models.Row{row}, }); err != nil { return err } @@ -1019,9 +1003,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, // Ensure at least one result is emitted. if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + return ctx.Send(&query.Result{}) } return nil } @@ -1065,10 +1047,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond) if err != nil { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, - }) + return ctx.Send(&query.Result{Err: err}) } emitted := false @@ -1103,8 +1082,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem } if err := ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, + Series: []*models.Row{row}, }); err != nil { return err } @@ -1113,9 +1091,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem // Ensure at least one result is emitted. if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + return ctx.Send(&query.Result{}) } return nil } diff --git a/query/compile.go b/query/compile.go index 0b2a746c44..892ebd2a54 100644 --- a/query/compile.go +++ b/query/compile.go @@ -874,10 +874,11 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) columns := stmt.ColumnNames() return &preparedStatement{ - stmt: stmt, - opt: opt, - ic: shards, - columns: columns, - now: c.Options.Now, + stmt: stmt, + opt: opt, + ic: shards, + columns: columns, + maxPointN: sopt.MaxPointN, + now: c.Options.Now, }, nil } diff --git a/query/execution_context.go b/query/execution_context.go new file mode 100644 index 0000000000..a3d7d26200 --- /dev/null +++ b/query/execution_context.go @@ -0,0 +1,113 @@ +package query + +import ( + "context" + "sync" +) + +// ExecutionContext contains state that the query is currently executing with. +type ExecutionContext struct { + context.Context + + // The statement ID of the executing query. + statementID int + + // The query ID of the executing query. + QueryID uint64 + + // The query task information available to the StatementExecutor. + task *Task + + // Output channel where results and errors should be sent. + Results chan *Result + + // Options used to start this query. + ExecutionOptions + + mu sync.RWMutex + done chan struct{} + err error +} + +func (ctx *ExecutionContext) watch() { + ctx.done = make(chan struct{}) + if ctx.err != nil { + close(ctx.done) + return + } + + go func() { + defer close(ctx.done) + + var taskCtx <-chan struct{} + if ctx.task != nil { + taskCtx = ctx.task.closing + } + + select { + case <-taskCtx: + ctx.err = ctx.task.Error() + if ctx.err == nil { + ctx.err = ErrQueryInterrupted + } + case <-ctx.AbortCh: + ctx.err = ErrQueryAborted + case <-ctx.Context.Done(): + ctx.err = ctx.Context.Err() + } + }() +} + +func (ctx *ExecutionContext) Done() <-chan struct{} { + ctx.mu.RLock() + if ctx.done != nil { + defer ctx.mu.RUnlock() + return ctx.done + } + ctx.mu.RUnlock() + + ctx.mu.Lock() + defer ctx.mu.Unlock() + if ctx.done == nil { + ctx.watch() + } + return ctx.done +} + +func (ctx *ExecutionContext) Err() error { + ctx.mu.RLock() + defer ctx.mu.RUnlock() + return ctx.err +} + +func (ctx *ExecutionContext) Value(key interface{}) interface{} { + switch key { + case monitorContextKey: + return ctx.task + } + return ctx.Context.Value(key) +} + +// send sends a Result to the Results channel and will exit if the query has +// been aborted. +func (ctx *ExecutionContext) send(result *Result) error { + result.StatementID = ctx.statementID + select { + case <-ctx.AbortCh: + return ErrQueryAborted + case ctx.Results <- result: + } + return nil +} + +// Send sends a Result to the Results channel and will exit if the query has +// been interrupted or aborted. +func (ctx *ExecutionContext) Send(result *Result) error { + result.StatementID = ctx.statementID + select { + case <-ctx.Done(): + return ctx.Err() + case ctx.Results <- result: + } + return nil +} diff --git a/query/executor.go b/query/executor.go index 3f0a288861..b80b45ec1f 100644 --- a/query/executor.go +++ b/query/executor.go @@ -139,55 +139,11 @@ type ExecutionOptions struct { AbortCh <-chan struct{} } -// ExecutionContext contains state that the query is currently executing with. -type ExecutionContext struct { - // The statement ID of the executing query. - StatementID int - - // The query ID of the executing query. - QueryID uint64 - - // The query task information available to the StatementExecutor. - Query *Task - - // Output channel where results and errors should be sent. - Results chan *Result - - // A channel that is closed when the query is interrupted. - InterruptCh <-chan struct{} - - // Options used to start this query. - ExecutionOptions -} - -// send sends a Result to the Results channel and will exit if the query has -// been aborted. -func (ctx *ExecutionContext) send(result *Result) error { - select { - case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: - } - return nil -} - -// Send sends a Result to the Results channel and will exit if the query has -// been interrupted or aborted. -func (ctx *ExecutionContext) Send(result *Result) error { - select { - case <-ctx.InterruptCh: - return ErrQueryInterrupted - case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: - } - return nil -} - type contextKey int const ( iteratorsContextKey contextKey = iota + monitorContextKey ) // NewContextWithIterators returns a new context.Context with the *Iterators slice added. @@ -208,7 +164,7 @@ func tryAddAuxIteratorToContext(ctx context.Context, itr AuxIterator) { type StatementExecutor interface { // ExecuteStatement executes a statement. Results should be sent to the // results channel in the ExecutionContext. - ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error + ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error } // StatementNormalizer normalizes a statement before it is executed. @@ -298,7 +254,7 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds()) }(time.Now()) - qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) + ctx, detach, err := e.TaskManager.AttachQuery(query, opt, closing) if err != nil { select { case results <- &Result{Err: err}: @@ -306,21 +262,15 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo } return } - defer e.TaskManager.DetachQuery(qid) + defer detach() // Setup the execution context that will be used when executing statements. - ctx := ExecutionContext{ - QueryID: qid, - Query: task, - Results: results, - InterruptCh: task.closing, - ExecutionOptions: opt, - } + ctx.Results = results var i int LOOP: for ; i < len(query.Statements); i++ { - ctx.StatementID = i + ctx.statementID = i stmt := query.Statements[i] // If a default database wasn't passed in by the caller, check the statement. @@ -390,7 +340,7 @@ LOOP: if err == ErrQueryInterrupted { // Query was interrupted so retrieve the real interrupt error from // the query task if there is one. - if qerr := task.Error(); qerr != nil { + if qerr := ctx.Err(); qerr != nil { err = qerr } } @@ -409,13 +359,11 @@ LOOP: // Check if the query was interrupted during an uninterruptible statement. interrupted := false - if ctx.InterruptCh != nil { - select { - case <-ctx.InterruptCh: - interrupted = true - default: - // Query has not been interrupted. - } + select { + case <-ctx.Done(): + interrupted = true + default: + // Query has not been interrupted. } if interrupted { @@ -463,11 +411,6 @@ func (e *Executor) recover(query *influxql.Query, results chan *Result) { } } -// MonitorFunc is a function that will be called to check if a query -// is currently healthy. If the query needs to be interrupted for some reason, -// the error should be returned by this function. -type MonitorFunc func(<-chan struct{}) error - // Task is the internal data structure for managing queries. // For the public use data structure that gets returned, see Task. type Task struct { diff --git a/query/executor_test.go b/query/executor_test.go index 16940252b6..6f7f0c1afb 100644 --- a/query/executor_test.go +++ b/query/executor_test.go @@ -14,10 +14,10 @@ import ( var errUnexpected = errors.New("unexpected error") type StatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error } -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) } @@ -33,7 +33,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if ctx.QueryID != 1 { t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID) } @@ -54,7 +54,7 @@ func TestQueryExecutor_KillQuery(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -62,8 +62,8 @@ func TestQueryExecutor_KillQuery(t *testing.T) { qid <- ctx.QueryID select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() case <-time.After(100 * time.Millisecond): t.Error("killing the query did not close the channel after 100 milliseconds") return errUnexpected @@ -95,7 +95,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -103,7 +103,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) { qid <- ctx.QueryID select { - case <-ctx.InterruptCh: + case <-ctx.Done(): select { case <-done: // Keep the query running until we run SHOW QUERIES. @@ -164,7 +164,7 @@ func TestQueryExecutor_KillQuery_CloseTaskManager(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -221,7 +221,7 @@ func TestQueryExecutor_KillQuery_AlreadyKilled(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -257,10 +257,10 @@ func TestQueryExecutor_Interrupt(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() case <-time.After(100 * time.Millisecond): t.Error("killing the query did not close the channel after 100 milliseconds") return errUnexpected @@ -288,7 +288,7 @@ func TestQueryExecutor_Abort(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { <-ch1 if err := ctx.Send(&query.Result{Err: errUnexpected}); err != query.ErrQueryAborted { t.Errorf("unexpected error: %v", err) @@ -311,7 +311,7 @@ func TestQueryExecutor_Abort(t *testing.T) { func TestQueryExecutor_ShowQueries(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -347,10 +347,10 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() case <-time.After(time.Second): t.Errorf("timeout has not killed the query") return errUnexpected @@ -376,10 +376,10 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { qid <- ctx.QueryID - <-ctx.InterruptCh - return query.ErrQueryInterrupted + <-ctx.Done() + return ctx.Err() }, } e.TaskManager.MaxConcurrentQueries = 1 @@ -416,10 +416,10 @@ func TestQueryExecutor_Close(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { close(ch1) - <-ctx.InterruptCh - return query.ErrQueryInterrupted + <-ctx.Done() + return ctx.Err() }, } @@ -463,7 +463,7 @@ func TestQueryExecutor_Panic(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { panic("test error") }, } @@ -481,7 +481,7 @@ func TestQueryExecutor_Panic(t *testing.T) { func TestQueryExecutor_InvalidSource(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errors.New("statement executed unexpectedly") }, } diff --git a/query/iterator.go b/query/iterator.go index e549e08a5c..8e22963012 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -791,7 +791,6 @@ func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions) opt.Limit, opt.Offset = stmt.Limit, stmt.Offset opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset opt.MaxSeriesN = sopt.MaxSeriesN - opt.InterruptCh = sopt.InterruptCh opt.Authorizer = sopt.Authorizer return opt, nil diff --git a/query/iterator_test.go b/query/iterator_test.go index 0c19ecd74d..59ef8e3874 100644 --- a/query/iterator_test.go +++ b/query/iterator_test.go @@ -1554,9 +1554,11 @@ func TestIterator_EncodeDecode(t *testing.T) { // Test implementation of influxql.FloatIterator type FloatIterator struct { - Points []query.FloatPoint - Closed bool - stats query.IteratorStats + Context context.Context + Points []query.FloatPoint + Closed bool + Delay time.Duration + stats query.IteratorStats } func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats } @@ -1568,6 +1570,22 @@ func (itr *FloatIterator) Next() (*query.FloatPoint, error) { return nil, nil } + // If we have asked for a delay, then delay the returning of the point + // until either an (optional) context is done or the time has passed. + if itr.Delay > 0 { + var done <-chan struct{} + if itr.Context != nil { + done = itr.Context.Done() + } + + timer := time.NewTimer(itr.Delay) + select { + case <-timer.C: + case <-done: + timer.Stop() + return nil, itr.Context.Err() + } + } v := &itr.Points[0] itr.Points = itr.Points[1:] return v, nil diff --git a/query/monitor.go b/query/monitor.go index b6a3735bde..c3c254eea8 100644 --- a/query/monitor.go +++ b/query/monitor.go @@ -1,6 +1,31 @@ package query -import "time" +import ( + "context" + "time" +) + +// MonitorFunc is a function that will be called to check if a query +// is currently healthy. If the query needs to be interrupted for some reason, +// the error should be returned by this function. +type MonitorFunc func(<-chan struct{}) error + +// Monitor monitors the status of a query and returns whether the query should +// be aborted with an error. +type Monitor interface { + // Monitor starts a new goroutine that will monitor a query. The function + // will be passed in a channel to signal when the query has been finished + // normally. If the function returns with an error and the query is still + // running, the query will be terminated. + Monitor(fn MonitorFunc) +} + +// MonitorFromContext returns a Monitor embedded within the Context +// if one exists. +func MonitorFromContext(ctx context.Context) Monitor { + v, _ := ctx.Value(monitorContextKey).(Monitor) + return v +} // PointLimitMonitor is a query monitor that exits when the number of points // emitted exceeds a threshold. diff --git a/query/monitor_test.go b/query/monitor_test.go new file mode 100644 index 0000000000..74369a6b15 --- /dev/null +++ b/query/monitor_test.go @@ -0,0 +1,57 @@ +package query_test + +import ( + "context" + "testing" + "time" + + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxql" +) + +func TestPointLimitMonitor(t *testing.T) { + t.Parallel() + + stmt := MustParseSelectStatement(`SELECT mean(value) FROM cpu`) + + // Create a new task manager so we can use the query task as a monitor. + taskManager := query.NewTaskManager() + ctx, detach, err := taskManager.AttachQuery(&influxql.Query{ + Statements: []influxql.Statement{stmt}, + }, query.ExecutionOptions{}, nil) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + defer detach() + + shardMapper := ShardMapper{ + MapShardsFn: func(sources influxql.Sources, t influxql.TimeRange) query.ShardGroup { + return &ShardGroup{ + CreateIteratorFn: func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { + return &FloatIterator{ + Points: []query.FloatPoint{ + {Name: "cpu", Value: 35}, + }, + Context: ctx, + Delay: 2 * time.Second, + stats: query.IteratorStats{ + PointN: 10, + }, + }, nil + }, + Fields: map[string]influxql.DataType{ + "value": influxql.Float, + }, + } + }, + } + + itrs, _, err := query.Select(ctx, stmt, &shardMapper, query.SelectOptions{ + MaxPointN: 1, + }) + if _, err := Iterators(itrs).ReadAll(); err == nil { + t.Fatalf("expected an error") + } else if got, want := err.Error(), "max-select-point limit exceeed: (10/1)"; got != want { + t.Fatalf("unexpected error: got=%v want=%v", got, want) + } +} diff --git a/query/select.go b/query/select.go index c071410eca..93d1253671 100644 --- a/query/select.go +++ b/query/select.go @@ -22,13 +22,14 @@ type SelectOptions struct { // If zero, all nodes are used. NodeID uint64 - // An optional channel that, if closed, signals that the select should be - // interrupted. - InterruptCh <-chan struct{} - // Maximum number of concurrent series. MaxSeriesN int + // Maximum number of points to read from the query. + // This requires the passed in context to have a Monitor that is + // created using WithMonitor. + MaxPointN int + // Maximum number of buckets for a statement. MaxBucketsN int } @@ -97,8 +98,9 @@ type preparedStatement struct { IteratorCreator io.Closer } - columns []string - now time.Time + columns []string + maxPointN int + now time.Time } func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, error) { @@ -106,10 +108,22 @@ func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, e // Each level of the query should use a time range discovered during // compilation, but that requires too large of a refactor at the moment. ctx = context.WithValue(ctx, "now", p.now) - itrs, err := buildIterators(ctx, p.stmt, p.ic, p.opt) + + opt := p.opt + opt.InterruptCh = ctx.Done() + itrs, err := buildIterators(ctx, p.stmt, p.ic, opt) if err != nil { return nil, nil, err } + + // If a monitor exists and we are told there is a maximum number of points, + // register the monitor function. + if m := MonitorFromContext(ctx); m != nil { + if p.maxPointN > 0 { + monitor := PointLimitMonitor(itrs, DefaultStatsInterval, p.maxPointN) + m.Monitor(monitor) + } + } return itrs, p.columns, nil } diff --git a/query/task_manager.go b/query/task_manager.go index a567f6bf6e..cbf15f794c 100644 --- a/query/task_manager.go +++ b/query/task_manager.go @@ -1,6 +1,7 @@ package query import ( + "context" "fmt" "sync" "time" @@ -71,7 +72,7 @@ func NewTaskManager() *TaskManager { } // ExecuteStatement executes a statement containing one of the task management queries. -func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error { +func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error { switch stmt := stmt.(type) { case *influxql.ShowQueriesStatement: rows, err := t.executeShowQueriesStatement(stmt) @@ -79,10 +80,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon return err } - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Series: rows, - } + ctx.Send(&Result{ + Series: rows, + }) case *influxql.KillQueryStatement: var messages []*Message if ctx.ReadOnly { @@ -92,10 +92,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon if err := t.executeKillQueryStatement(stmt); err != nil { return err } - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Messages: messages, - } + ctx.Send(&Result{ + Messages: messages, + }) default: return ErrInvalidQuery } @@ -150,22 +149,22 @@ func (t *TaskManager) queryError(qid uint64, err error) { // query finishes running. // // After a query finishes running, the system is free to reuse a query id. -func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt <-chan struct{}) (uint64, *Task, error) { +func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}) (*ExecutionContext, func(), error) { t.mu.Lock() defer t.mu.Unlock() if t.shutdown { - return 0, nil, ErrQueryEngineShutdown + return nil, nil, ErrQueryEngineShutdown } if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries { - return 0, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries) + return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries) } qid := t.nextID query := &Task{ query: q.String(), - database: database, + database: opt.Database, status: RunningTask, startTime: time.Now(), closing: make(chan struct{}), @@ -189,7 +188,15 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt }) } t.nextID++ - return qid, query, nil + + ctx := &ExecutionContext{ + Context: context.Background(), + QueryID: qid, + task: query, + ExecutionOptions: opt, + } + ctx.watch() + return ctx, func() { t.DetachQuery(qid) }, nil } // KillQuery enters a query into the killed state and closes the channel diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 0459431d57..e35a4b871f 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -50,7 +50,7 @@ func TestContinuousQueryService_Run(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { callCnt++ if callCnt >= expectCallCnt { done <- struct{}{} @@ -122,7 +122,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -204,7 +204,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) { // Set a callback for ExecuteQuery. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -274,7 +274,7 @@ func TestContinuousQueryService_GroupByOffset(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -315,7 +315,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) { done := make(chan struct{}) // Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { done <- struct{}{} ctx.Results <- &query.Result{Err: errUnexpected} return nil @@ -336,7 +336,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) { func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errUnexpected }, } @@ -435,7 +435,7 @@ func TestExecuteContinuousQuery_TimeRange(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -549,7 +549,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) { // Set a callback for ExecuteStatement. tests := make(chan test, 1) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { test := <-tests s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} @@ -592,7 +592,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) { func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errExpected }, } @@ -612,7 +612,7 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { const writeN = int64(50) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{ Series: []*models.Row{{ Name: "result", @@ -658,8 +658,8 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{} + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Send(&query.Result{}) return nil }, } @@ -821,10 +821,10 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error // StatementExecutor is a mock statement executor. type StatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error } -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) } diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 0d67a0d28e..b707ea1141 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -31,7 +31,7 @@ import ( // Ensure the handler returns results from a query (including nil results). func TestHandler_Query(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -54,7 +54,7 @@ func TestHandler_Query(t *testing.T) { // Ensure the handler returns results from a query passed as a file. func TestHandler_Query_File(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -123,7 +123,7 @@ func TestHandler_Query_Auth(t *testing.T) { } // Set mock statement executor for handler to use. - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -238,7 +238,7 @@ func TestHandler_Query_Auth(t *testing.T) { // Ensure the handler returns results from a query (including nil results). func TestHandler_QueryRegex(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `test` { @@ -255,7 +255,7 @@ func TestHandler_QueryRegex(t *testing.T) { // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeResults(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil @@ -273,7 +273,7 @@ func TestHandler_Query_MergeResults(t *testing.T) { // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeEmptyResults(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil @@ -291,7 +291,7 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) { // Ensure the handler can parse chunked and chunk size query parameters. func TestHandler_Query_Chunked(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if ctx.ChunkSize != 2 { t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) } @@ -315,7 +315,7 @@ func TestHandler_Query_Chunked(t *testing.T) { func TestHandler_Query_Async(t *testing.T) { done := make(chan struct{}) h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -449,7 +449,7 @@ func TestHandler_Query_ErrAuthorize(t *testing.T) { // Ensure the handler returns a status 200 if an error is returned in the result. func TestHandler_Query_ErrResult(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errors.New("measurement not found") } @@ -470,9 +470,9 @@ func TestHandler_Query_CloseNotify(t *testing.T) { interrupted := make(chan struct{}) h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { select { - case <-ctx.InterruptCh: + case <-ctx.Done(): case <-done: } close(interrupted) @@ -614,7 +614,7 @@ func TestHandler_PromRead(t *testing.T) { b := bytes.NewReader(compressed) h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ /c/ AND neqregex !~ /d/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -678,7 +678,7 @@ func TestHandler_Ping(t *testing.T) { // Ensure the handler returns the version correctly from the different endpoints. func TestHandler_Version(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return nil } tests := []struct { @@ -933,10 +933,10 @@ func NewHandler(requireAuthentication bool) *Handler { // HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. type HandlerStatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error } -func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) }