From 733d8428126d77a84c92f768114e3a98ce891c48 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 2 Mar 2018 10:59:40 -0600 Subject: [PATCH] Turn the ExecutionContext into a context.Context Along with modifying ExecutionContext to be a context and have the TaskManager return the context itself, this also creates a Monitor interface and exposes the Monitor through the Context. This way, we can access the monitor from within the query.Select method and keep all of the limits inside of the query package instead of leaking them into the statement executor. An eventual goal is to remove the InterruptCh from the IteratorOptions and use the Context instead, but for now, we'll just assign the done channel from the Context to the IteratorOptions so at least they refer to the same channel. --- coordinator/statement_executor.go | 112 ++++++++----------- query/compile.go | 11 +- query/execution_context.go | 113 ++++++++++++++++++++ query/executor.go | 81 +++----------- query/executor_test.go | 52 ++++----- query/iterator.go | 1 - query/iterator_test.go | 24 ++++- query/monitor.go | 27 ++++- query/monitor_test.go | 57 ++++++++++ query/select.go | 28 +++-- query/task_manager.go | 35 +++--- services/continuous_querier/service_test.go | 28 ++--- services/httpd/handler_test.go | 30 +++--- 13 files changed, 376 insertions(+), 223 deletions(-) create mode 100644 query/execution_context.go create mode 100644 query/monitor_test.go diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 61ee8e7179..07284fd1fb 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -56,10 +56,10 @@ type StatementExecutor struct { } // ExecuteStatement executes the given statement with the given execution context. -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { // Select statements are handled separately so that they can be streamed. if stmt, ok := stmt.(*influxql.SelectStatement); ok { - return e.executeSelectStatement(context.Background(), stmt, &ctx) + return e.executeSelectStatement(stmt, ctx) } var rows models.Rows @@ -140,9 +140,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. err = e.executeDropUserStatement(stmt) case *influxql.ExplainStatement: if stmt.Analyze { - rows, err = e.executeExplainAnalyzeStatement(stmt, &ctx) + rows, err = e.executeExplainAnalyzeStatement(stmt, ctx) } else { - rows, err = e.executeExplainStatement(stmt, &ctx) + rows, err = e.executeExplainStatement(stmt, ctx) } case *influxql.GrantStatement: if ctx.ReadOnly { @@ -167,13 +167,13 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. case *influxql.ShowContinuousQueriesStatement: rows, err = e.executeShowContinuousQueriesStatement(stmt) case *influxql.ShowDatabasesStatement: - rows, err = e.executeShowDatabasesStatement(stmt, &ctx) + rows, err = e.executeShowDatabasesStatement(stmt, ctx) case *influxql.ShowDiagnosticsStatement: rows, err = e.executeShowDiagnosticsStatement(stmt) case *influxql.ShowGrantsForUserStatement: rows, err = e.executeShowGrantsForUserStatement(stmt) case *influxql.ShowMeasurementsStatement: - return e.executeShowMeasurementsStatement(stmt, &ctx) + return e.executeShowMeasurementsStatement(stmt, ctx) case *influxql.ShowMeasurementCardinalityStatement: rows, err = e.executeShowMeasurementCardinalityStatement(stmt) case *influxql.ShowRetentionPoliciesStatement: @@ -189,9 +189,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. case *influxql.ShowSubscriptionsStatement: rows, err = e.executeShowSubscriptionsStatement(stmt) case *influxql.ShowTagKeysStatement: - return e.executeShowTagKeys(stmt, &ctx) + return e.executeShowTagKeys(stmt, ctx) case *influxql.ShowTagValuesStatement: - return e.executeShowTagValues(stmt, &ctx) + return e.executeShowTagValues(stmt, ctx) case *influxql.ShowUsersStatement: rows, err = e.executeShowUsersStatement(stmt) case *influxql.SetPasswordUserStatement: @@ -211,9 +211,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. } return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: rows, - Messages: messages, + Series: rows, + Messages: messages, }) } @@ -407,13 +406,12 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme return e.MetaClient.DropUser(q.Name) } -func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) { +func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) { opt := query.SelectOptions{ - InterruptCh: ectx.InterruptCh, - NodeID: ectx.ExecutionOptions.NodeID, + NodeID: ctx.ExecutionOptions.NodeID, MaxSeriesN: e.MaxSelectSeriesN, MaxBucketsN: e.MaxSelectBucketsN, - Authorizer: ectx.Authorizer, + Authorizer: ctx.Authorizer, } // Prepare the query for execution, but do not actually execute it. @@ -442,13 +440,13 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) { stmt := q.Statement t, span := tracing.NewTrace("select") - ctx := tracing.NewContextWithTrace(context.Background(), t) + ctx := tracing.NewContextWithTrace(ectx, t) ctx = tracing.NewContextWithSpan(ctx, span) var aux query.Iterators ctx = query.NewContextWithIterators(ctx, &aux) start := time.Now() - itrs, columns, err := e.createIterators(ctx, stmt, ectx) + itrs, columns, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions) if err != nil { return nil, err } @@ -474,8 +472,8 @@ func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainSt } else if row == nil { // Check if the query was interrupted while emitting. select { - case <-ectx.InterruptCh: - err = query.ErrQueryInterrupted + case <-ectx.Done(): + err = ectx.Err() goto CLEANUP default: } @@ -544,14 +542,14 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw return e.MetaClient.UpdateUser(q.Name, q.Password) } -func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) error { - itrs, columns, err := e.createIterators(ctx, stmt, ectx) +func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error { + itrs, columns, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions) if err != nil { return err } // Generate a row emitter from the iterator set. - em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize) + em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize) em.Columns = columns if stmt.Location != nil { em.Location = stmt.Location @@ -576,8 +574,8 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in } else if row == nil { // Check if the query was interrupted while emitting. select { - case <-ectx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() default: } break @@ -593,13 +591,12 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in } result := &query.Result{ - StatementID: ectx.StatementID, - Series: []*models.Row{row}, - Partial: partial, + Series: []*models.Row{row}, + Partial: partial, } // Send results or exit if closing. - if err := ectx.Send(result); err != nil { + if err := ctx.Send(result); err != nil { return err } @@ -613,13 +610,12 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in } var messages []*query.Message - if ectx.ReadOnly { + if ctx.ReadOnly { messages = append(messages, query.ReadOnlyWarning(stmt.String())) } - return ectx.Send(&query.Result{ - StatementID: ectx.StatementID, - Messages: messages, + return ctx.Send(&query.Result{ + Messages: messages, Series: []*models.Row{{ Name: "result", Columns: []string{"time", "written"}, @@ -630,34 +626,28 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in // Always emit at least one result. if !emitted { - return ectx.Send(&query.Result{ - StatementID: ectx.StatementID, - Series: make([]*models.Row, 0), + return ctx.Send(&query.Result{ + Series: make([]*models.Row, 0), }) } return nil } -func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) ([]query.Iterator, []string, error) { - opt := query.SelectOptions{ - InterruptCh: ectx.InterruptCh, - NodeID: ectx.ExecutionOptions.NodeID, +func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) ([]query.Iterator, []string, error) { + sopt := query.SelectOptions{ + NodeID: opt.NodeID, MaxSeriesN: e.MaxSelectSeriesN, + MaxPointN: e.MaxSelectPointN, MaxBucketsN: e.MaxSelectBucketsN, - Authorizer: ectx.Authorizer, + Authorizer: opt.Authorizer, } // Create a set of iterators from a selection. - itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, opt) + itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, sopt) if err != nil { return nil, nil, err } - - if e.MaxSelectPointN > 0 { - monitor := query.PointLimitMonitor(itrs, query.DefaultStatsInterval, e.MaxSelectPointN) - ectx.Query.Monitor(monitor) - } return itrs, columns, nil } @@ -738,8 +728,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition) if err != nil || len(names) == 0 { return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, + Err: err, }) } @@ -763,13 +752,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } if len(values) == 0 { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + return ctx.Send(&query.Result{}) } return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, Series: []*models.Row{{ Name: "measurements", Columns: []string{"name"}, @@ -975,8 +961,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond) if err != nil { return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, + Err: err, }) } @@ -1009,8 +994,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, } if err := ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, + Series: []*models.Row{row}, }); err != nil { return err } @@ -1019,9 +1003,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, // Ensure at least one result is emitted. if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + return ctx.Send(&query.Result{}) } return nil } @@ -1065,10 +1047,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond) if err != nil { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, - }) + return ctx.Send(&query.Result{Err: err}) } emitted := false @@ -1103,8 +1082,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem } if err := ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, + Series: []*models.Row{row}, }); err != nil { return err } @@ -1113,9 +1091,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem // Ensure at least one result is emitted. if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + return ctx.Send(&query.Result{}) } return nil } diff --git a/query/compile.go b/query/compile.go index 0b2a746c44..892ebd2a54 100644 --- a/query/compile.go +++ b/query/compile.go @@ -874,10 +874,11 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) columns := stmt.ColumnNames() return &preparedStatement{ - stmt: stmt, - opt: opt, - ic: shards, - columns: columns, - now: c.Options.Now, + stmt: stmt, + opt: opt, + ic: shards, + columns: columns, + maxPointN: sopt.MaxPointN, + now: c.Options.Now, }, nil } diff --git a/query/execution_context.go b/query/execution_context.go new file mode 100644 index 0000000000..a3d7d26200 --- /dev/null +++ b/query/execution_context.go @@ -0,0 +1,113 @@ +package query + +import ( + "context" + "sync" +) + +// ExecutionContext contains state that the query is currently executing with. +type ExecutionContext struct { + context.Context + + // The statement ID of the executing query. + statementID int + + // The query ID of the executing query. + QueryID uint64 + + // The query task information available to the StatementExecutor. + task *Task + + // Output channel where results and errors should be sent. + Results chan *Result + + // Options used to start this query. + ExecutionOptions + + mu sync.RWMutex + done chan struct{} + err error +} + +func (ctx *ExecutionContext) watch() { + ctx.done = make(chan struct{}) + if ctx.err != nil { + close(ctx.done) + return + } + + go func() { + defer close(ctx.done) + + var taskCtx <-chan struct{} + if ctx.task != nil { + taskCtx = ctx.task.closing + } + + select { + case <-taskCtx: + ctx.err = ctx.task.Error() + if ctx.err == nil { + ctx.err = ErrQueryInterrupted + } + case <-ctx.AbortCh: + ctx.err = ErrQueryAborted + case <-ctx.Context.Done(): + ctx.err = ctx.Context.Err() + } + }() +} + +func (ctx *ExecutionContext) Done() <-chan struct{} { + ctx.mu.RLock() + if ctx.done != nil { + defer ctx.mu.RUnlock() + return ctx.done + } + ctx.mu.RUnlock() + + ctx.mu.Lock() + defer ctx.mu.Unlock() + if ctx.done == nil { + ctx.watch() + } + return ctx.done +} + +func (ctx *ExecutionContext) Err() error { + ctx.mu.RLock() + defer ctx.mu.RUnlock() + return ctx.err +} + +func (ctx *ExecutionContext) Value(key interface{}) interface{} { + switch key { + case monitorContextKey: + return ctx.task + } + return ctx.Context.Value(key) +} + +// send sends a Result to the Results channel and will exit if the query has +// been aborted. +func (ctx *ExecutionContext) send(result *Result) error { + result.StatementID = ctx.statementID + select { + case <-ctx.AbortCh: + return ErrQueryAborted + case ctx.Results <- result: + } + return nil +} + +// Send sends a Result to the Results channel and will exit if the query has +// been interrupted or aborted. +func (ctx *ExecutionContext) Send(result *Result) error { + result.StatementID = ctx.statementID + select { + case <-ctx.Done(): + return ctx.Err() + case ctx.Results <- result: + } + return nil +} diff --git a/query/executor.go b/query/executor.go index 3f0a288861..b80b45ec1f 100644 --- a/query/executor.go +++ b/query/executor.go @@ -139,55 +139,11 @@ type ExecutionOptions struct { AbortCh <-chan struct{} } -// ExecutionContext contains state that the query is currently executing with. -type ExecutionContext struct { - // The statement ID of the executing query. - StatementID int - - // The query ID of the executing query. - QueryID uint64 - - // The query task information available to the StatementExecutor. - Query *Task - - // Output channel where results and errors should be sent. - Results chan *Result - - // A channel that is closed when the query is interrupted. - InterruptCh <-chan struct{} - - // Options used to start this query. - ExecutionOptions -} - -// send sends a Result to the Results channel and will exit if the query has -// been aborted. -func (ctx *ExecutionContext) send(result *Result) error { - select { - case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: - } - return nil -} - -// Send sends a Result to the Results channel and will exit if the query has -// been interrupted or aborted. -func (ctx *ExecutionContext) Send(result *Result) error { - select { - case <-ctx.InterruptCh: - return ErrQueryInterrupted - case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: - } - return nil -} - type contextKey int const ( iteratorsContextKey contextKey = iota + monitorContextKey ) // NewContextWithIterators returns a new context.Context with the *Iterators slice added. @@ -208,7 +164,7 @@ func tryAddAuxIteratorToContext(ctx context.Context, itr AuxIterator) { type StatementExecutor interface { // ExecuteStatement executes a statement. Results should be sent to the // results channel in the ExecutionContext. - ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error + ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error } // StatementNormalizer normalizes a statement before it is executed. @@ -298,7 +254,7 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds()) }(time.Now()) - qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) + ctx, detach, err := e.TaskManager.AttachQuery(query, opt, closing) if err != nil { select { case results <- &Result{Err: err}: @@ -306,21 +262,15 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo } return } - defer e.TaskManager.DetachQuery(qid) + defer detach() // Setup the execution context that will be used when executing statements. - ctx := ExecutionContext{ - QueryID: qid, - Query: task, - Results: results, - InterruptCh: task.closing, - ExecutionOptions: opt, - } + ctx.Results = results var i int LOOP: for ; i < len(query.Statements); i++ { - ctx.StatementID = i + ctx.statementID = i stmt := query.Statements[i] // If a default database wasn't passed in by the caller, check the statement. @@ -390,7 +340,7 @@ LOOP: if err == ErrQueryInterrupted { // Query was interrupted so retrieve the real interrupt error from // the query task if there is one. - if qerr := task.Error(); qerr != nil { + if qerr := ctx.Err(); qerr != nil { err = qerr } } @@ -409,13 +359,11 @@ LOOP: // Check if the query was interrupted during an uninterruptible statement. interrupted := false - if ctx.InterruptCh != nil { - select { - case <-ctx.InterruptCh: - interrupted = true - default: - // Query has not been interrupted. - } + select { + case <-ctx.Done(): + interrupted = true + default: + // Query has not been interrupted. } if interrupted { @@ -463,11 +411,6 @@ func (e *Executor) recover(query *influxql.Query, results chan *Result) { } } -// MonitorFunc is a function that will be called to check if a query -// is currently healthy. If the query needs to be interrupted for some reason, -// the error should be returned by this function. -type MonitorFunc func(<-chan struct{}) error - // Task is the internal data structure for managing queries. // For the public use data structure that gets returned, see Task. type Task struct { diff --git a/query/executor_test.go b/query/executor_test.go index 16940252b6..6f7f0c1afb 100644 --- a/query/executor_test.go +++ b/query/executor_test.go @@ -14,10 +14,10 @@ import ( var errUnexpected = errors.New("unexpected error") type StatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error } -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) } @@ -33,7 +33,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if ctx.QueryID != 1 { t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID) } @@ -54,7 +54,7 @@ func TestQueryExecutor_KillQuery(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -62,8 +62,8 @@ func TestQueryExecutor_KillQuery(t *testing.T) { qid <- ctx.QueryID select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() case <-time.After(100 * time.Millisecond): t.Error("killing the query did not close the channel after 100 milliseconds") return errUnexpected @@ -95,7 +95,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -103,7 +103,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) { qid <- ctx.QueryID select { - case <-ctx.InterruptCh: + case <-ctx.Done(): select { case <-done: // Keep the query running until we run SHOW QUERIES. @@ -164,7 +164,7 @@ func TestQueryExecutor_KillQuery_CloseTaskManager(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -221,7 +221,7 @@ func TestQueryExecutor_KillQuery_AlreadyKilled(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -257,10 +257,10 @@ func TestQueryExecutor_Interrupt(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() case <-time.After(100 * time.Millisecond): t.Error("killing the query did not close the channel after 100 milliseconds") return errUnexpected @@ -288,7 +288,7 @@ func TestQueryExecutor_Abort(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { <-ch1 if err := ctx.Send(&query.Result{Err: errUnexpected}); err != query.ErrQueryAborted { t.Errorf("unexpected error: %v", err) @@ -311,7 +311,7 @@ func TestQueryExecutor_Abort(t *testing.T) { func TestQueryExecutor_ShowQueries(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { switch stmt.(type) { case *influxql.ShowQueriesStatement: return e.TaskManager.ExecuteStatement(stmt, ctx) @@ -347,10 +347,10 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted + case <-ctx.Done(): + return ctx.Err() case <-time.After(time.Second): t.Errorf("timeout has not killed the query") return errUnexpected @@ -376,10 +376,10 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { qid <- ctx.QueryID - <-ctx.InterruptCh - return query.ErrQueryInterrupted + <-ctx.Done() + return ctx.Err() }, } e.TaskManager.MaxConcurrentQueries = 1 @@ -416,10 +416,10 @@ func TestQueryExecutor_Close(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { close(ch1) - <-ctx.InterruptCh - return query.ErrQueryInterrupted + <-ctx.Done() + return ctx.Err() }, } @@ -463,7 +463,7 @@ func TestQueryExecutor_Panic(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { panic("test error") }, } @@ -481,7 +481,7 @@ func TestQueryExecutor_Panic(t *testing.T) { func TestQueryExecutor_InvalidSource(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errors.New("statement executed unexpectedly") }, } diff --git a/query/iterator.go b/query/iterator.go index e549e08a5c..8e22963012 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -791,7 +791,6 @@ func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions) opt.Limit, opt.Offset = stmt.Limit, stmt.Offset opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset opt.MaxSeriesN = sopt.MaxSeriesN - opt.InterruptCh = sopt.InterruptCh opt.Authorizer = sopt.Authorizer return opt, nil diff --git a/query/iterator_test.go b/query/iterator_test.go index 0c19ecd74d..59ef8e3874 100644 --- a/query/iterator_test.go +++ b/query/iterator_test.go @@ -1554,9 +1554,11 @@ func TestIterator_EncodeDecode(t *testing.T) { // Test implementation of influxql.FloatIterator type FloatIterator struct { - Points []query.FloatPoint - Closed bool - stats query.IteratorStats + Context context.Context + Points []query.FloatPoint + Closed bool + Delay time.Duration + stats query.IteratorStats } func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats } @@ -1568,6 +1570,22 @@ func (itr *FloatIterator) Next() (*query.FloatPoint, error) { return nil, nil } + // If we have asked for a delay, then delay the returning of the point + // until either an (optional) context is done or the time has passed. + if itr.Delay > 0 { + var done <-chan struct{} + if itr.Context != nil { + done = itr.Context.Done() + } + + timer := time.NewTimer(itr.Delay) + select { + case <-timer.C: + case <-done: + timer.Stop() + return nil, itr.Context.Err() + } + } v := &itr.Points[0] itr.Points = itr.Points[1:] return v, nil diff --git a/query/monitor.go b/query/monitor.go index b6a3735bde..c3c254eea8 100644 --- a/query/monitor.go +++ b/query/monitor.go @@ -1,6 +1,31 @@ package query -import "time" +import ( + "context" + "time" +) + +// MonitorFunc is a function that will be called to check if a query +// is currently healthy. If the query needs to be interrupted for some reason, +// the error should be returned by this function. +type MonitorFunc func(<-chan struct{}) error + +// Monitor monitors the status of a query and returns whether the query should +// be aborted with an error. +type Monitor interface { + // Monitor starts a new goroutine that will monitor a query. The function + // will be passed in a channel to signal when the query has been finished + // normally. If the function returns with an error and the query is still + // running, the query will be terminated. + Monitor(fn MonitorFunc) +} + +// MonitorFromContext returns a Monitor embedded within the Context +// if one exists. +func MonitorFromContext(ctx context.Context) Monitor { + v, _ := ctx.Value(monitorContextKey).(Monitor) + return v +} // PointLimitMonitor is a query monitor that exits when the number of points // emitted exceeds a threshold. diff --git a/query/monitor_test.go b/query/monitor_test.go new file mode 100644 index 0000000000..74369a6b15 --- /dev/null +++ b/query/monitor_test.go @@ -0,0 +1,57 @@ +package query_test + +import ( + "context" + "testing" + "time" + + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxql" +) + +func TestPointLimitMonitor(t *testing.T) { + t.Parallel() + + stmt := MustParseSelectStatement(`SELECT mean(value) FROM cpu`) + + // Create a new task manager so we can use the query task as a monitor. + taskManager := query.NewTaskManager() + ctx, detach, err := taskManager.AttachQuery(&influxql.Query{ + Statements: []influxql.Statement{stmt}, + }, query.ExecutionOptions{}, nil) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + defer detach() + + shardMapper := ShardMapper{ + MapShardsFn: func(sources influxql.Sources, t influxql.TimeRange) query.ShardGroup { + return &ShardGroup{ + CreateIteratorFn: func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { + return &FloatIterator{ + Points: []query.FloatPoint{ + {Name: "cpu", Value: 35}, + }, + Context: ctx, + Delay: 2 * time.Second, + stats: query.IteratorStats{ + PointN: 10, + }, + }, nil + }, + Fields: map[string]influxql.DataType{ + "value": influxql.Float, + }, + } + }, + } + + itrs, _, err := query.Select(ctx, stmt, &shardMapper, query.SelectOptions{ + MaxPointN: 1, + }) + if _, err := Iterators(itrs).ReadAll(); err == nil { + t.Fatalf("expected an error") + } else if got, want := err.Error(), "max-select-point limit exceeed: (10/1)"; got != want { + t.Fatalf("unexpected error: got=%v want=%v", got, want) + } +} diff --git a/query/select.go b/query/select.go index c071410eca..93d1253671 100644 --- a/query/select.go +++ b/query/select.go @@ -22,13 +22,14 @@ type SelectOptions struct { // If zero, all nodes are used. NodeID uint64 - // An optional channel that, if closed, signals that the select should be - // interrupted. - InterruptCh <-chan struct{} - // Maximum number of concurrent series. MaxSeriesN int + // Maximum number of points to read from the query. + // This requires the passed in context to have a Monitor that is + // created using WithMonitor. + MaxPointN int + // Maximum number of buckets for a statement. MaxBucketsN int } @@ -97,8 +98,9 @@ type preparedStatement struct { IteratorCreator io.Closer } - columns []string - now time.Time + columns []string + maxPointN int + now time.Time } func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, error) { @@ -106,10 +108,22 @@ func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, e // Each level of the query should use a time range discovered during // compilation, but that requires too large of a refactor at the moment. ctx = context.WithValue(ctx, "now", p.now) - itrs, err := buildIterators(ctx, p.stmt, p.ic, p.opt) + + opt := p.opt + opt.InterruptCh = ctx.Done() + itrs, err := buildIterators(ctx, p.stmt, p.ic, opt) if err != nil { return nil, nil, err } + + // If a monitor exists and we are told there is a maximum number of points, + // register the monitor function. + if m := MonitorFromContext(ctx); m != nil { + if p.maxPointN > 0 { + monitor := PointLimitMonitor(itrs, DefaultStatsInterval, p.maxPointN) + m.Monitor(monitor) + } + } return itrs, p.columns, nil } diff --git a/query/task_manager.go b/query/task_manager.go index a567f6bf6e..cbf15f794c 100644 --- a/query/task_manager.go +++ b/query/task_manager.go @@ -1,6 +1,7 @@ package query import ( + "context" "fmt" "sync" "time" @@ -71,7 +72,7 @@ func NewTaskManager() *TaskManager { } // ExecuteStatement executes a statement containing one of the task management queries. -func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error { +func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error { switch stmt := stmt.(type) { case *influxql.ShowQueriesStatement: rows, err := t.executeShowQueriesStatement(stmt) @@ -79,10 +80,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon return err } - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Series: rows, - } + ctx.Send(&Result{ + Series: rows, + }) case *influxql.KillQueryStatement: var messages []*Message if ctx.ReadOnly { @@ -92,10 +92,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon if err := t.executeKillQueryStatement(stmt); err != nil { return err } - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Messages: messages, - } + ctx.Send(&Result{ + Messages: messages, + }) default: return ErrInvalidQuery } @@ -150,22 +149,22 @@ func (t *TaskManager) queryError(qid uint64, err error) { // query finishes running. // // After a query finishes running, the system is free to reuse a query id. -func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt <-chan struct{}) (uint64, *Task, error) { +func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}) (*ExecutionContext, func(), error) { t.mu.Lock() defer t.mu.Unlock() if t.shutdown { - return 0, nil, ErrQueryEngineShutdown + return nil, nil, ErrQueryEngineShutdown } if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries { - return 0, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries) + return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries) } qid := t.nextID query := &Task{ query: q.String(), - database: database, + database: opt.Database, status: RunningTask, startTime: time.Now(), closing: make(chan struct{}), @@ -189,7 +188,15 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt }) } t.nextID++ - return qid, query, nil + + ctx := &ExecutionContext{ + Context: context.Background(), + QueryID: qid, + task: query, + ExecutionOptions: opt, + } + ctx.watch() + return ctx, func() { t.DetachQuery(qid) }, nil } // KillQuery enters a query into the killed state and closes the channel diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 0459431d57..e35a4b871f 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -50,7 +50,7 @@ func TestContinuousQueryService_Run(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { callCnt++ if callCnt >= expectCallCnt { done <- struct{}{} @@ -122,7 +122,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -204,7 +204,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) { // Set a callback for ExecuteQuery. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -274,7 +274,7 @@ func TestContinuousQueryService_GroupByOffset(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -315,7 +315,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) { done := make(chan struct{}) // Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { done <- struct{}{} ctx.Results <- &query.Result{Err: errUnexpected} return nil @@ -336,7 +336,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) { func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errUnexpected }, } @@ -435,7 +435,7 @@ func TestExecuteContinuousQuery_TimeRange(t *testing.T) { // Set a callback for ExecuteStatement. s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) @@ -549,7 +549,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) { // Set a callback for ExecuteStatement. tests := make(chan test, 1) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { test := <-tests s := stmt.(*influxql.SelectStatement) valuer := &influxql.NowValuer{Location: s.Location} @@ -592,7 +592,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) { func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errExpected }, } @@ -612,7 +612,7 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { const writeN = int64(50) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{ Series: []*models.Row{{ Name: "result", @@ -658,8 +658,8 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) { s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{} + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Send(&query.Result{}) return nil }, } @@ -821,10 +821,10 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error // StatementExecutor is a mock statement executor. type StatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error } -func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) } diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 0d67a0d28e..b707ea1141 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -31,7 +31,7 @@ import ( // Ensure the handler returns results from a query (including nil results). func TestHandler_Query(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -54,7 +54,7 @@ func TestHandler_Query(t *testing.T) { // Ensure the handler returns results from a query passed as a file. func TestHandler_Query_File(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -123,7 +123,7 @@ func TestHandler_Query_Auth(t *testing.T) { } // Set mock statement executor for handler to use. - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -238,7 +238,7 @@ func TestHandler_Query_Auth(t *testing.T) { // Ensure the handler returns results from a query (including nil results). func TestHandler_QueryRegex(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `test` { @@ -255,7 +255,7 @@ func TestHandler_QueryRegex(t *testing.T) { // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeResults(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil @@ -273,7 +273,7 @@ func TestHandler_Query_MergeResults(t *testing.T) { // Ensure the handler merges results from the same statement. func TestHandler_Query_MergeEmptyResults(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}} ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} return nil @@ -291,7 +291,7 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) { // Ensure the handler can parse chunked and chunk size query parameters. func TestHandler_Query_Chunked(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if ctx.ChunkSize != 2 { t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) } @@ -315,7 +315,7 @@ func TestHandler_Query_Chunked(t *testing.T) { func TestHandler_Query_Async(t *testing.T) { done := make(chan struct{}) h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT * FROM bar` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -449,7 +449,7 @@ func TestHandler_Query_ErrAuthorize(t *testing.T) { // Ensure the handler returns a status 200 if an error is returned in the result. func TestHandler_Query_ErrResult(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return errors.New("measurement not found") } @@ -470,9 +470,9 @@ func TestHandler_Query_CloseNotify(t *testing.T) { interrupted := make(chan struct{}) h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { select { - case <-ctx.InterruptCh: + case <-ctx.Done(): case <-done: } close(interrupted) @@ -614,7 +614,7 @@ func TestHandler_PromRead(t *testing.T) { b := bytes.NewReader(compressed) h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ /c/ AND neqregex !~ /d/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` { t.Fatalf("unexpected query: %s", stmt.String()) } else if ctx.Database != `foo` { @@ -678,7 +678,7 @@ func TestHandler_Ping(t *testing.T) { // Ensure the handler returns the version correctly from the different endpoints. func TestHandler_Version(t *testing.T) { h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { return nil } tests := []struct { @@ -933,10 +933,10 @@ func NewHandler(requireAuthentication bool) *Handler { // HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. type HandlerStatementExecutor struct { - ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error + ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error } -func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error { +func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { return e.ExecuteStatementFn(stmt, ctx) }