Turn the ExecutionContext into a context.Context
Along with modifying ExecutionContext to be a context and have the TaskManager return the context itself, this also creates a Monitor interface and exposes the Monitor through the Context. This way, we can access the monitor from within the query.Select method and keep all of the limits inside of the query package instead of leaking them into the statement executor. An eventual goal is to remove the InterruptCh from the IteratorOptions and use the Context instead, but for now, we'll just assign the done channel from the Context to the IteratorOptions so at least they refer to the same channel.pull/9525/head
parent
ed27a00255
commit
733d842812
|
@ -56,10 +56,10 @@ type StatementExecutor struct {
|
|||
}
|
||||
|
||||
// ExecuteStatement executes the given statement with the given execution context.
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
// Select statements are handled separately so that they can be streamed.
|
||||
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
|
||||
return e.executeSelectStatement(context.Background(), stmt, &ctx)
|
||||
return e.executeSelectStatement(stmt, ctx)
|
||||
}
|
||||
|
||||
var rows models.Rows
|
||||
|
@ -140,9 +140,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
|
|||
err = e.executeDropUserStatement(stmt)
|
||||
case *influxql.ExplainStatement:
|
||||
if stmt.Analyze {
|
||||
rows, err = e.executeExplainAnalyzeStatement(stmt, &ctx)
|
||||
rows, err = e.executeExplainAnalyzeStatement(stmt, ctx)
|
||||
} else {
|
||||
rows, err = e.executeExplainStatement(stmt, &ctx)
|
||||
rows, err = e.executeExplainStatement(stmt, ctx)
|
||||
}
|
||||
case *influxql.GrantStatement:
|
||||
if ctx.ReadOnly {
|
||||
|
@ -167,13 +167,13 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
|
|||
case *influxql.ShowContinuousQueriesStatement:
|
||||
rows, err = e.executeShowContinuousQueriesStatement(stmt)
|
||||
case *influxql.ShowDatabasesStatement:
|
||||
rows, err = e.executeShowDatabasesStatement(stmt, &ctx)
|
||||
rows, err = e.executeShowDatabasesStatement(stmt, ctx)
|
||||
case *influxql.ShowDiagnosticsStatement:
|
||||
rows, err = e.executeShowDiagnosticsStatement(stmt)
|
||||
case *influxql.ShowGrantsForUserStatement:
|
||||
rows, err = e.executeShowGrantsForUserStatement(stmt)
|
||||
case *influxql.ShowMeasurementsStatement:
|
||||
return e.executeShowMeasurementsStatement(stmt, &ctx)
|
||||
return e.executeShowMeasurementsStatement(stmt, ctx)
|
||||
case *influxql.ShowMeasurementCardinalityStatement:
|
||||
rows, err = e.executeShowMeasurementCardinalityStatement(stmt)
|
||||
case *influxql.ShowRetentionPoliciesStatement:
|
||||
|
@ -189,9 +189,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
|
|||
case *influxql.ShowSubscriptionsStatement:
|
||||
rows, err = e.executeShowSubscriptionsStatement(stmt)
|
||||
case *influxql.ShowTagKeysStatement:
|
||||
return e.executeShowTagKeys(stmt, &ctx)
|
||||
return e.executeShowTagKeys(stmt, ctx)
|
||||
case *influxql.ShowTagValuesStatement:
|
||||
return e.executeShowTagValues(stmt, &ctx)
|
||||
return e.executeShowTagValues(stmt, ctx)
|
||||
case *influxql.ShowUsersStatement:
|
||||
rows, err = e.executeShowUsersStatement(stmt)
|
||||
case *influxql.SetPasswordUserStatement:
|
||||
|
@ -211,7 +211,6 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
|
|||
}
|
||||
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: rows,
|
||||
Messages: messages,
|
||||
})
|
||||
|
@ -407,13 +406,12 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme
|
|||
return e.MetaClient.DropUser(q.Name)
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
|
||||
func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
|
||||
opt := query.SelectOptions{
|
||||
InterruptCh: ectx.InterruptCh,
|
||||
NodeID: ectx.ExecutionOptions.NodeID,
|
||||
NodeID: ctx.ExecutionOptions.NodeID,
|
||||
MaxSeriesN: e.MaxSelectSeriesN,
|
||||
MaxBucketsN: e.MaxSelectBucketsN,
|
||||
Authorizer: ectx.Authorizer,
|
||||
Authorizer: ctx.Authorizer,
|
||||
}
|
||||
|
||||
// Prepare the query for execution, but do not actually execute it.
|
||||
|
@ -442,13 +440,13 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement
|
|||
func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
|
||||
stmt := q.Statement
|
||||
t, span := tracing.NewTrace("select")
|
||||
ctx := tracing.NewContextWithTrace(context.Background(), t)
|
||||
ctx := tracing.NewContextWithTrace(ectx, t)
|
||||
ctx = tracing.NewContextWithSpan(ctx, span)
|
||||
var aux query.Iterators
|
||||
ctx = query.NewContextWithIterators(ctx, &aux)
|
||||
start := time.Now()
|
||||
|
||||
itrs, columns, err := e.createIterators(ctx, stmt, ectx)
|
||||
itrs, columns, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -474,8 +472,8 @@ func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainSt
|
|||
} else if row == nil {
|
||||
// Check if the query was interrupted while emitting.
|
||||
select {
|
||||
case <-ectx.InterruptCh:
|
||||
err = query.ErrQueryInterrupted
|
||||
case <-ectx.Done():
|
||||
err = ectx.Err()
|
||||
goto CLEANUP
|
||||
default:
|
||||
}
|
||||
|
@ -544,14 +542,14 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw
|
|||
return e.MetaClient.UpdateUser(q.Name, q.Password)
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) error {
|
||||
itrs, columns, err := e.createIterators(ctx, stmt, ectx)
|
||||
func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {
|
||||
itrs, columns, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Generate a row emitter from the iterator set.
|
||||
em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize)
|
||||
em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
|
||||
em.Columns = columns
|
||||
if stmt.Location != nil {
|
||||
em.Location = stmt.Location
|
||||
|
@ -576,8 +574,8 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
|
|||
} else if row == nil {
|
||||
// Check if the query was interrupted while emitting.
|
||||
select {
|
||||
case <-ectx.InterruptCh:
|
||||
return query.ErrQueryInterrupted
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
break
|
||||
|
@ -593,13 +591,12 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
|
|||
}
|
||||
|
||||
result := &query.Result{
|
||||
StatementID: ectx.StatementID,
|
||||
Series: []*models.Row{row},
|
||||
Partial: partial,
|
||||
}
|
||||
|
||||
// Send results or exit if closing.
|
||||
if err := ectx.Send(result); err != nil {
|
||||
if err := ctx.Send(result); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -613,12 +610,11 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
|
|||
}
|
||||
|
||||
var messages []*query.Message
|
||||
if ectx.ReadOnly {
|
||||
if ctx.ReadOnly {
|
||||
messages = append(messages, query.ReadOnlyWarning(stmt.String()))
|
||||
}
|
||||
|
||||
return ectx.Send(&query.Result{
|
||||
StatementID: ectx.StatementID,
|
||||
return ctx.Send(&query.Result{
|
||||
Messages: messages,
|
||||
Series: []*models.Row{{
|
||||
Name: "result",
|
||||
|
@ -630,8 +626,7 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
|
|||
|
||||
// Always emit at least one result.
|
||||
if !emitted {
|
||||
return ectx.Send(&query.Result{
|
||||
StatementID: ectx.StatementID,
|
||||
return ctx.Send(&query.Result{
|
||||
Series: make([]*models.Row, 0),
|
||||
})
|
||||
}
|
||||
|
@ -639,25 +634,20 @@ func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *in
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) ([]query.Iterator, []string, error) {
|
||||
opt := query.SelectOptions{
|
||||
InterruptCh: ectx.InterruptCh,
|
||||
NodeID: ectx.ExecutionOptions.NodeID,
|
||||
func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) ([]query.Iterator, []string, error) {
|
||||
sopt := query.SelectOptions{
|
||||
NodeID: opt.NodeID,
|
||||
MaxSeriesN: e.MaxSelectSeriesN,
|
||||
MaxPointN: e.MaxSelectPointN,
|
||||
MaxBucketsN: e.MaxSelectBucketsN,
|
||||
Authorizer: ectx.Authorizer,
|
||||
Authorizer: opt.Authorizer,
|
||||
}
|
||||
|
||||
// Create a set of iterators from a selection.
|
||||
itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, opt)
|
||||
itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, sopt)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if e.MaxSelectPointN > 0 {
|
||||
monitor := query.PointLimitMonitor(itrs, query.DefaultStatsInterval, e.MaxSelectPointN)
|
||||
ectx.Query.Monitor(monitor)
|
||||
}
|
||||
return itrs, columns, nil
|
||||
}
|
||||
|
||||
|
@ -738,7 +728,6 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
|
|||
names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition)
|
||||
if err != nil || len(names) == 0 {
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Err: err,
|
||||
})
|
||||
}
|
||||
|
@ -763,13 +752,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
|
|||
}
|
||||
|
||||
if len(values) == 0 {
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
})
|
||||
return ctx.Send(&query.Result{})
|
||||
}
|
||||
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: []*models.Row{{
|
||||
Name: "measurements",
|
||||
Columns: []string{"name"},
|
||||
|
@ -975,7 +961,6 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement,
|
|||
tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond)
|
||||
if err != nil {
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Err: err,
|
||||
})
|
||||
}
|
||||
|
@ -1009,7 +994,6 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement,
|
|||
}
|
||||
|
||||
if err := ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: []*models.Row{row},
|
||||
}); err != nil {
|
||||
return err
|
||||
|
@ -1019,9 +1003,7 @@ func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement,
|
|||
|
||||
// Ensure at least one result is emitted.
|
||||
if !emitted {
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
})
|
||||
return ctx.Send(&query.Result{})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1065,10 +1047,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
|
|||
|
||||
tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond)
|
||||
if err != nil {
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Err: err,
|
||||
})
|
||||
return ctx.Send(&query.Result{Err: err})
|
||||
}
|
||||
|
||||
emitted := false
|
||||
|
@ -1103,7 +1082,6 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
|
|||
}
|
||||
|
||||
if err := ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: []*models.Row{row},
|
||||
}); err != nil {
|
||||
return err
|
||||
|
@ -1113,9 +1091,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
|
|||
|
||||
// Ensure at least one result is emitted.
|
||||
if !emitted {
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
})
|
||||
return ctx.Send(&query.Result{})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -878,6 +878,7 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions)
|
|||
opt: opt,
|
||||
ic: shards,
|
||||
columns: columns,
|
||||
maxPointN: sopt.MaxPointN,
|
||||
now: c.Options.Now,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// ExecutionContext contains state that the query is currently executing with.
|
||||
type ExecutionContext struct {
|
||||
context.Context
|
||||
|
||||
// The statement ID of the executing query.
|
||||
statementID int
|
||||
|
||||
// The query ID of the executing query.
|
||||
QueryID uint64
|
||||
|
||||
// The query task information available to the StatementExecutor.
|
||||
task *Task
|
||||
|
||||
// Output channel where results and errors should be sent.
|
||||
Results chan *Result
|
||||
|
||||
// Options used to start this query.
|
||||
ExecutionOptions
|
||||
|
||||
mu sync.RWMutex
|
||||
done chan struct{}
|
||||
err error
|
||||
}
|
||||
|
||||
func (ctx *ExecutionContext) watch() {
|
||||
ctx.done = make(chan struct{})
|
||||
if ctx.err != nil {
|
||||
close(ctx.done)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(ctx.done)
|
||||
|
||||
var taskCtx <-chan struct{}
|
||||
if ctx.task != nil {
|
||||
taskCtx = ctx.task.closing
|
||||
}
|
||||
|
||||
select {
|
||||
case <-taskCtx:
|
||||
ctx.err = ctx.task.Error()
|
||||
if ctx.err == nil {
|
||||
ctx.err = ErrQueryInterrupted
|
||||
}
|
||||
case <-ctx.AbortCh:
|
||||
ctx.err = ErrQueryAborted
|
||||
case <-ctx.Context.Done():
|
||||
ctx.err = ctx.Context.Err()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ctx *ExecutionContext) Done() <-chan struct{} {
|
||||
ctx.mu.RLock()
|
||||
if ctx.done != nil {
|
||||
defer ctx.mu.RUnlock()
|
||||
return ctx.done
|
||||
}
|
||||
ctx.mu.RUnlock()
|
||||
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
if ctx.done == nil {
|
||||
ctx.watch()
|
||||
}
|
||||
return ctx.done
|
||||
}
|
||||
|
||||
func (ctx *ExecutionContext) Err() error {
|
||||
ctx.mu.RLock()
|
||||
defer ctx.mu.RUnlock()
|
||||
return ctx.err
|
||||
}
|
||||
|
||||
func (ctx *ExecutionContext) Value(key interface{}) interface{} {
|
||||
switch key {
|
||||
case monitorContextKey:
|
||||
return ctx.task
|
||||
}
|
||||
return ctx.Context.Value(key)
|
||||
}
|
||||
|
||||
// send sends a Result to the Results channel and will exit if the query has
|
||||
// been aborted.
|
||||
func (ctx *ExecutionContext) send(result *Result) error {
|
||||
result.StatementID = ctx.statementID
|
||||
select {
|
||||
case <-ctx.AbortCh:
|
||||
return ErrQueryAborted
|
||||
case ctx.Results <- result:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send sends a Result to the Results channel and will exit if the query has
|
||||
// been interrupted or aborted.
|
||||
func (ctx *ExecutionContext) Send(result *Result) error {
|
||||
result.StatementID = ctx.statementID
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case ctx.Results <- result:
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -139,55 +139,11 @@ type ExecutionOptions struct {
|
|||
AbortCh <-chan struct{}
|
||||
}
|
||||
|
||||
// ExecutionContext contains state that the query is currently executing with.
|
||||
type ExecutionContext struct {
|
||||
// The statement ID of the executing query.
|
||||
StatementID int
|
||||
|
||||
// The query ID of the executing query.
|
||||
QueryID uint64
|
||||
|
||||
// The query task information available to the StatementExecutor.
|
||||
Query *Task
|
||||
|
||||
// Output channel where results and errors should be sent.
|
||||
Results chan *Result
|
||||
|
||||
// A channel that is closed when the query is interrupted.
|
||||
InterruptCh <-chan struct{}
|
||||
|
||||
// Options used to start this query.
|
||||
ExecutionOptions
|
||||
}
|
||||
|
||||
// send sends a Result to the Results channel and will exit if the query has
|
||||
// been aborted.
|
||||
func (ctx *ExecutionContext) send(result *Result) error {
|
||||
select {
|
||||
case <-ctx.AbortCh:
|
||||
return ErrQueryAborted
|
||||
case ctx.Results <- result:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send sends a Result to the Results channel and will exit if the query has
|
||||
// been interrupted or aborted.
|
||||
func (ctx *ExecutionContext) Send(result *Result) error {
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
return ErrQueryInterrupted
|
||||
case <-ctx.AbortCh:
|
||||
return ErrQueryAborted
|
||||
case ctx.Results <- result:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type contextKey int
|
||||
|
||||
const (
|
||||
iteratorsContextKey contextKey = iota
|
||||
monitorContextKey
|
||||
)
|
||||
|
||||
// NewContextWithIterators returns a new context.Context with the *Iterators slice added.
|
||||
|
@ -208,7 +164,7 @@ func tryAddAuxIteratorToContext(ctx context.Context, itr AuxIterator) {
|
|||
type StatementExecutor interface {
|
||||
// ExecuteStatement executes a statement. Results should be sent to the
|
||||
// results channel in the ExecutionContext.
|
||||
ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error
|
||||
ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error
|
||||
}
|
||||
|
||||
// StatementNormalizer normalizes a statement before it is executed.
|
||||
|
@ -298,7 +254,7 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo
|
|||
atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds())
|
||||
}(time.Now())
|
||||
|
||||
qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing)
|
||||
ctx, detach, err := e.TaskManager.AttachQuery(query, opt, closing)
|
||||
if err != nil {
|
||||
select {
|
||||
case results <- &Result{Err: err}:
|
||||
|
@ -306,21 +262,15 @@ func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, clo
|
|||
}
|
||||
return
|
||||
}
|
||||
defer e.TaskManager.DetachQuery(qid)
|
||||
defer detach()
|
||||
|
||||
// Setup the execution context that will be used when executing statements.
|
||||
ctx := ExecutionContext{
|
||||
QueryID: qid,
|
||||
Query: task,
|
||||
Results: results,
|
||||
InterruptCh: task.closing,
|
||||
ExecutionOptions: opt,
|
||||
}
|
||||
ctx.Results = results
|
||||
|
||||
var i int
|
||||
LOOP:
|
||||
for ; i < len(query.Statements); i++ {
|
||||
ctx.StatementID = i
|
||||
ctx.statementID = i
|
||||
stmt := query.Statements[i]
|
||||
|
||||
// If a default database wasn't passed in by the caller, check the statement.
|
||||
|
@ -390,7 +340,7 @@ LOOP:
|
|||
if err == ErrQueryInterrupted {
|
||||
// Query was interrupted so retrieve the real interrupt error from
|
||||
// the query task if there is one.
|
||||
if qerr := task.Error(); qerr != nil {
|
||||
if qerr := ctx.Err(); qerr != nil {
|
||||
err = qerr
|
||||
}
|
||||
}
|
||||
|
@ -409,14 +359,12 @@ LOOP:
|
|||
|
||||
// Check if the query was interrupted during an uninterruptible statement.
|
||||
interrupted := false
|
||||
if ctx.InterruptCh != nil {
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
case <-ctx.Done():
|
||||
interrupted = true
|
||||
default:
|
||||
// Query has not been interrupted.
|
||||
}
|
||||
}
|
||||
|
||||
if interrupted {
|
||||
break
|
||||
|
@ -463,11 +411,6 @@ func (e *Executor) recover(query *influxql.Query, results chan *Result) {
|
|||
}
|
||||
}
|
||||
|
||||
// MonitorFunc is a function that will be called to check if a query
|
||||
// is currently healthy. If the query needs to be interrupted for some reason,
|
||||
// the error should be returned by this function.
|
||||
type MonitorFunc func(<-chan struct{}) error
|
||||
|
||||
// Task is the internal data structure for managing queries.
|
||||
// For the public use data structure that gets returned, see Task.
|
||||
type Task struct {
|
||||
|
|
|
@ -14,10 +14,10 @@ import (
|
|||
var errUnexpected = errors.New("unexpected error")
|
||||
|
||||
type StatementExecutor struct {
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return e.ExecuteStatementFn(stmt, ctx)
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if ctx.QueryID != 1 {
|
||||
t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID)
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ func TestQueryExecutor_KillQuery(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
switch stmt.(type) {
|
||||
case *influxql.KillQueryStatement:
|
||||
return e.TaskManager.ExecuteStatement(stmt, ctx)
|
||||
|
@ -62,8 +62,8 @@ func TestQueryExecutor_KillQuery(t *testing.T) {
|
|||
|
||||
qid <- ctx.QueryID
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
return query.ErrQueryInterrupted
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("killing the query did not close the channel after 100 milliseconds")
|
||||
return errUnexpected
|
||||
|
@ -95,7 +95,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
switch stmt.(type) {
|
||||
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
|
||||
return e.TaskManager.ExecuteStatement(stmt, ctx)
|
||||
|
@ -103,7 +103,7 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) {
|
|||
|
||||
qid <- ctx.QueryID
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
case <-ctx.Done():
|
||||
select {
|
||||
case <-done:
|
||||
// Keep the query running until we run SHOW QUERIES.
|
||||
|
@ -164,7 +164,7 @@ func TestQueryExecutor_KillQuery_CloseTaskManager(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
switch stmt.(type) {
|
||||
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
|
||||
return e.TaskManager.ExecuteStatement(stmt, ctx)
|
||||
|
@ -221,7 +221,7 @@ func TestQueryExecutor_KillQuery_AlreadyKilled(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
switch stmt.(type) {
|
||||
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
|
||||
return e.TaskManager.ExecuteStatement(stmt, ctx)
|
||||
|
@ -257,10 +257,10 @@ func TestQueryExecutor_Interrupt(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
return query.ErrQueryInterrupted
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("killing the query did not close the channel after 100 milliseconds")
|
||||
return errUnexpected
|
||||
|
@ -288,7 +288,7 @@ func TestQueryExecutor_Abort(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
<-ch1
|
||||
if err := ctx.Send(&query.Result{Err: errUnexpected}); err != query.ErrQueryAborted {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -311,7 +311,7 @@ func TestQueryExecutor_Abort(t *testing.T) {
|
|||
func TestQueryExecutor_ShowQueries(t *testing.T) {
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
switch stmt.(type) {
|
||||
case *influxql.ShowQueriesStatement:
|
||||
return e.TaskManager.ExecuteStatement(stmt, ctx)
|
||||
|
@ -347,10 +347,10 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
return query.ErrQueryInterrupted
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(time.Second):
|
||||
t.Errorf("timeout has not killed the query")
|
||||
return errUnexpected
|
||||
|
@ -376,10 +376,10 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
qid <- ctx.QueryID
|
||||
<-ctx.InterruptCh
|
||||
return query.ErrQueryInterrupted
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
},
|
||||
}
|
||||
e.TaskManager.MaxConcurrentQueries = 1
|
||||
|
@ -416,10 +416,10 @@ func TestQueryExecutor_Close(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
close(ch1)
|
||||
<-ctx.InterruptCh
|
||||
return query.ErrQueryInterrupted
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -463,7 +463,7 @@ func TestQueryExecutor_Panic(t *testing.T) {
|
|||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
panic("test error")
|
||||
},
|
||||
}
|
||||
|
@ -481,7 +481,7 @@ func TestQueryExecutor_Panic(t *testing.T) {
|
|||
func TestQueryExecutor_InvalidSource(t *testing.T) {
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return errors.New("statement executed unexpectedly")
|
||||
},
|
||||
}
|
||||
|
|
|
@ -791,7 +791,6 @@ func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions)
|
|||
opt.Limit, opt.Offset = stmt.Limit, stmt.Offset
|
||||
opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset
|
||||
opt.MaxSeriesN = sopt.MaxSeriesN
|
||||
opt.InterruptCh = sopt.InterruptCh
|
||||
opt.Authorizer = sopt.Authorizer
|
||||
|
||||
return opt, nil
|
||||
|
|
|
@ -1554,8 +1554,10 @@ func TestIterator_EncodeDecode(t *testing.T) {
|
|||
|
||||
// Test implementation of influxql.FloatIterator
|
||||
type FloatIterator struct {
|
||||
Context context.Context
|
||||
Points []query.FloatPoint
|
||||
Closed bool
|
||||
Delay time.Duration
|
||||
stats query.IteratorStats
|
||||
}
|
||||
|
||||
|
@ -1568,6 +1570,22 @@ func (itr *FloatIterator) Next() (*query.FloatPoint, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// If we have asked for a delay, then delay the returning of the point
|
||||
// until either an (optional) context is done or the time has passed.
|
||||
if itr.Delay > 0 {
|
||||
var done <-chan struct{}
|
||||
if itr.Context != nil {
|
||||
done = itr.Context.Done()
|
||||
}
|
||||
|
||||
timer := time.NewTimer(itr.Delay)
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-done:
|
||||
timer.Stop()
|
||||
return nil, itr.Context.Err()
|
||||
}
|
||||
}
|
||||
v := &itr.Points[0]
|
||||
itr.Points = itr.Points[1:]
|
||||
return v, nil
|
||||
|
|
|
@ -1,6 +1,31 @@
|
|||
package query
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MonitorFunc is a function that will be called to check if a query
|
||||
// is currently healthy. If the query needs to be interrupted for some reason,
|
||||
// the error should be returned by this function.
|
||||
type MonitorFunc func(<-chan struct{}) error
|
||||
|
||||
// Monitor monitors the status of a query and returns whether the query should
|
||||
// be aborted with an error.
|
||||
type Monitor interface {
|
||||
// Monitor starts a new goroutine that will monitor a query. The function
|
||||
// will be passed in a channel to signal when the query has been finished
|
||||
// normally. If the function returns with an error and the query is still
|
||||
// running, the query will be terminated.
|
||||
Monitor(fn MonitorFunc)
|
||||
}
|
||||
|
||||
// MonitorFromContext returns a Monitor embedded within the Context
|
||||
// if one exists.
|
||||
func MonitorFromContext(ctx context.Context) Monitor {
|
||||
v, _ := ctx.Value(monitorContextKey).(Monitor)
|
||||
return v
|
||||
}
|
||||
|
||||
// PointLimitMonitor is a query monitor that exits when the number of points
|
||||
// emitted exceeds a threshold.
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
package query_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
func TestPointLimitMonitor(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stmt := MustParseSelectStatement(`SELECT mean(value) FROM cpu`)
|
||||
|
||||
// Create a new task manager so we can use the query task as a monitor.
|
||||
taskManager := query.NewTaskManager()
|
||||
ctx, detach, err := taskManager.AttachQuery(&influxql.Query{
|
||||
Statements: []influxql.Statement{stmt},
|
||||
}, query.ExecutionOptions{}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
defer detach()
|
||||
|
||||
shardMapper := ShardMapper{
|
||||
MapShardsFn: func(sources influxql.Sources, t influxql.TimeRange) query.ShardGroup {
|
||||
return &ShardGroup{
|
||||
CreateIteratorFn: func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
return &FloatIterator{
|
||||
Points: []query.FloatPoint{
|
||||
{Name: "cpu", Value: 35},
|
||||
},
|
||||
Context: ctx,
|
||||
Delay: 2 * time.Second,
|
||||
stats: query.IteratorStats{
|
||||
PointN: 10,
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
Fields: map[string]influxql.DataType{
|
||||
"value": influxql.Float,
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
itrs, _, err := query.Select(ctx, stmt, &shardMapper, query.SelectOptions{
|
||||
MaxPointN: 1,
|
||||
})
|
||||
if _, err := Iterators(itrs).ReadAll(); err == nil {
|
||||
t.Fatalf("expected an error")
|
||||
} else if got, want := err.Error(), "max-select-point limit exceeed: (10/1)"; got != want {
|
||||
t.Fatalf("unexpected error: got=%v want=%v", got, want)
|
||||
}
|
||||
}
|
|
@ -22,13 +22,14 @@ type SelectOptions struct {
|
|||
// If zero, all nodes are used.
|
||||
NodeID uint64
|
||||
|
||||
// An optional channel that, if closed, signals that the select should be
|
||||
// interrupted.
|
||||
InterruptCh <-chan struct{}
|
||||
|
||||
// Maximum number of concurrent series.
|
||||
MaxSeriesN int
|
||||
|
||||
// Maximum number of points to read from the query.
|
||||
// This requires the passed in context to have a Monitor that is
|
||||
// created using WithMonitor.
|
||||
MaxPointN int
|
||||
|
||||
// Maximum number of buckets for a statement.
|
||||
MaxBucketsN int
|
||||
}
|
||||
|
@ -98,6 +99,7 @@ type preparedStatement struct {
|
|||
io.Closer
|
||||
}
|
||||
columns []string
|
||||
maxPointN int
|
||||
now time.Time
|
||||
}
|
||||
|
||||
|
@ -106,10 +108,22 @@ func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, e
|
|||
// Each level of the query should use a time range discovered during
|
||||
// compilation, but that requires too large of a refactor at the moment.
|
||||
ctx = context.WithValue(ctx, "now", p.now)
|
||||
itrs, err := buildIterators(ctx, p.stmt, p.ic, p.opt)
|
||||
|
||||
opt := p.opt
|
||||
opt.InterruptCh = ctx.Done()
|
||||
itrs, err := buildIterators(ctx, p.stmt, p.ic, opt)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// If a monitor exists and we are told there is a maximum number of points,
|
||||
// register the monitor function.
|
||||
if m := MonitorFromContext(ctx); m != nil {
|
||||
if p.maxPointN > 0 {
|
||||
monitor := PointLimitMonitor(itrs, DefaultStatsInterval, p.maxPointN)
|
||||
m.Monitor(monitor)
|
||||
}
|
||||
}
|
||||
return itrs, p.columns, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -71,7 +72,7 @@ func NewTaskManager() *TaskManager {
|
|||
}
|
||||
|
||||
// ExecuteStatement executes a statement containing one of the task management queries.
|
||||
func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error {
|
||||
func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error {
|
||||
switch stmt := stmt.(type) {
|
||||
case *influxql.ShowQueriesStatement:
|
||||
rows, err := t.executeShowQueriesStatement(stmt)
|
||||
|
@ -79,10 +80,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon
|
|||
return err
|
||||
}
|
||||
|
||||
ctx.Results <- &Result{
|
||||
StatementID: ctx.StatementID,
|
||||
ctx.Send(&Result{
|
||||
Series: rows,
|
||||
}
|
||||
})
|
||||
case *influxql.KillQueryStatement:
|
||||
var messages []*Message
|
||||
if ctx.ReadOnly {
|
||||
|
@ -92,10 +92,9 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon
|
|||
if err := t.executeKillQueryStatement(stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.Results <- &Result{
|
||||
StatementID: ctx.StatementID,
|
||||
ctx.Send(&Result{
|
||||
Messages: messages,
|
||||
}
|
||||
})
|
||||
default:
|
||||
return ErrInvalidQuery
|
||||
}
|
||||
|
@ -150,22 +149,22 @@ func (t *TaskManager) queryError(qid uint64, err error) {
|
|||
// query finishes running.
|
||||
//
|
||||
// After a query finishes running, the system is free to reuse a query id.
|
||||
func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt <-chan struct{}) (uint64, *Task, error) {
|
||||
func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}) (*ExecutionContext, func(), error) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if t.shutdown {
|
||||
return 0, nil, ErrQueryEngineShutdown
|
||||
return nil, nil, ErrQueryEngineShutdown
|
||||
}
|
||||
|
||||
if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries {
|
||||
return 0, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries)
|
||||
return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries)
|
||||
}
|
||||
|
||||
qid := t.nextID
|
||||
query := &Task{
|
||||
query: q.String(),
|
||||
database: database,
|
||||
database: opt.Database,
|
||||
status: RunningTask,
|
||||
startTime: time.Now(),
|
||||
closing: make(chan struct{}),
|
||||
|
@ -189,7 +188,15 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt
|
|||
})
|
||||
}
|
||||
t.nextID++
|
||||
return qid, query, nil
|
||||
|
||||
ctx := &ExecutionContext{
|
||||
Context: context.Background(),
|
||||
QueryID: qid,
|
||||
task: query,
|
||||
ExecutionOptions: opt,
|
||||
}
|
||||
ctx.watch()
|
||||
return ctx, func() { t.DetachQuery(qid) }, nil
|
||||
}
|
||||
|
||||
// KillQuery enters a query into the killed state and closes the channel
|
||||
|
|
|
@ -50,7 +50,7 @@ func TestContinuousQueryService_Run(t *testing.T) {
|
|||
|
||||
// Set a callback for ExecuteStatement.
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
callCnt++
|
||||
if callCnt >= expectCallCnt {
|
||||
done <- struct{}{}
|
||||
|
@ -122,7 +122,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) {
|
|||
|
||||
// Set a callback for ExecuteStatement.
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
valuer := &influxql.NowValuer{Location: s.Location}
|
||||
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
|
||||
|
@ -204,7 +204,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) {
|
|||
|
||||
// Set a callback for ExecuteQuery.
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
valuer := &influxql.NowValuer{Location: s.Location}
|
||||
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
|
||||
|
@ -274,7 +274,7 @@ func TestContinuousQueryService_GroupByOffset(t *testing.T) {
|
|||
|
||||
// Set a callback for ExecuteStatement.
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
valuer := &influxql.NowValuer{Location: s.Location}
|
||||
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
|
||||
|
@ -315,7 +315,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) {
|
|||
done := make(chan struct{})
|
||||
// Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader.
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
done <- struct{}{}
|
||||
ctx.Results <- &query.Result{Err: errUnexpected}
|
||||
return nil
|
||||
|
@ -336,7 +336,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) {
|
|||
func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return errUnexpected
|
||||
},
|
||||
}
|
||||
|
@ -435,7 +435,7 @@ func TestExecuteContinuousQuery_TimeRange(t *testing.T) {
|
|||
|
||||
// Set a callback for ExecuteStatement.
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
valuer := &influxql.NowValuer{Location: s.Location}
|
||||
_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
|
||||
|
@ -549,7 +549,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) {
|
|||
// Set a callback for ExecuteStatement.
|
||||
tests := make(chan test, 1)
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
test := <-tests
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
valuer := &influxql.NowValuer{Location: s.Location}
|
||||
|
@ -592,7 +592,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) {
|
|||
func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return errExpected
|
||||
},
|
||||
}
|
||||
|
@ -612,7 +612,7 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) {
|
|||
const writeN = int64(50)
|
||||
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
ctx.Results <- &query.Result{
|
||||
Series: []*models.Row{{
|
||||
Name: "result",
|
||||
|
@ -658,8 +658,8 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) {
|
|||
func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
s.QueryExecutor.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
ctx.Results <- &query.Result{}
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
ctx.Send(&query.Result{})
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
@ -821,10 +821,10 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error
|
|||
|
||||
// StatementExecutor is a mock statement executor.
|
||||
type StatementExecutor struct {
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return e.ExecuteStatementFn(stmt, ctx)
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import (
|
|||
// Ensure the handler returns results from a query (including nil results).
|
||||
func TestHandler_Query(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if stmt.String() != `SELECT * FROM bar` {
|
||||
t.Fatalf("unexpected query: %s", stmt.String())
|
||||
} else if ctx.Database != `foo` {
|
||||
|
@ -54,7 +54,7 @@ func TestHandler_Query(t *testing.T) {
|
|||
// Ensure the handler returns results from a query passed as a file.
|
||||
func TestHandler_Query_File(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if stmt.String() != `SELECT * FROM bar` {
|
||||
t.Fatalf("unexpected query: %s", stmt.String())
|
||||
} else if ctx.Database != `foo` {
|
||||
|
@ -123,7 +123,7 @@ func TestHandler_Query_Auth(t *testing.T) {
|
|||
}
|
||||
|
||||
// Set mock statement executor for handler to use.
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if stmt.String() != `SELECT * FROM bar` {
|
||||
t.Fatalf("unexpected query: %s", stmt.String())
|
||||
} else if ctx.Database != `foo` {
|
||||
|
@ -238,7 +238,7 @@ func TestHandler_Query_Auth(t *testing.T) {
|
|||
// Ensure the handler returns results from a query (including nil results).
|
||||
func TestHandler_QueryRegex(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` {
|
||||
t.Fatalf("unexpected query: %s", stmt.String())
|
||||
} else if ctx.Database != `test` {
|
||||
|
@ -255,7 +255,7 @@ func TestHandler_QueryRegex(t *testing.T) {
|
|||
// Ensure the handler merges results from the same statement.
|
||||
func TestHandler_Query_MergeResults(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
|
||||
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
||||
return nil
|
||||
|
@ -273,7 +273,7 @@ func TestHandler_Query_MergeResults(t *testing.T) {
|
|||
// Ensure the handler merges results from the same statement.
|
||||
func TestHandler_Query_MergeEmptyResults(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}}
|
||||
ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
|
||||
return nil
|
||||
|
@ -291,7 +291,7 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) {
|
|||
// Ensure the handler can parse chunked and chunk size query parameters.
|
||||
func TestHandler_Query_Chunked(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if ctx.ChunkSize != 2 {
|
||||
t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize)
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ func TestHandler_Query_Chunked(t *testing.T) {
|
|||
func TestHandler_Query_Async(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if stmt.String() != `SELECT * FROM bar` {
|
||||
t.Fatalf("unexpected query: %s", stmt.String())
|
||||
} else if ctx.Database != `foo` {
|
||||
|
@ -449,7 +449,7 @@ func TestHandler_Query_ErrAuthorize(t *testing.T) {
|
|||
// Ensure the handler returns a status 200 if an error is returned in the result.
|
||||
func TestHandler_Query_ErrResult(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return errors.New("measurement not found")
|
||||
}
|
||||
|
||||
|
@ -470,9 +470,9 @@ func TestHandler_Query_CloseNotify(t *testing.T) {
|
|||
|
||||
interrupted := make(chan struct{})
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
case <-ctx.Done():
|
||||
case <-done:
|
||||
}
|
||||
close(interrupted)
|
||||
|
@ -614,7 +614,7 @@ func TestHandler_PromRead(t *testing.T) {
|
|||
b := bytes.NewReader(compressed)
|
||||
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ /c/ AND neqregex !~ /d/ AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` {
|
||||
t.Fatalf("unexpected query: %s", stmt.String())
|
||||
} else if ctx.Database != `foo` {
|
||||
|
@ -678,7 +678,7 @@ func TestHandler_Ping(t *testing.T) {
|
|||
// Ensure the handler returns the version correctly from the different endpoints.
|
||||
func TestHandler_Version(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return nil
|
||||
}
|
||||
tests := []struct {
|
||||
|
@ -933,10 +933,10 @@ func NewHandler(requireAuthentication bool) *Handler {
|
|||
|
||||
// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
|
||||
type HandlerStatementExecutor struct {
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx query.ExecutionContext) error
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
|
||||
}
|
||||
|
||||
func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error {
|
||||
func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
|
||||
return e.ExecuteStatementFn(stmt, ctx)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue