From 0090c5b11141e99ee80dc8980cff615c3b63dfd6 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Tue, 24 Aug 2021 11:27:10 -0700 Subject: [PATCH] fix: return correct count of ErrNotExecuted (#22273) executeQuery() iterates over statements until each is processed or if an error is encountered that causes the loop to exit pre-maturely. It should return ErrNotExecuted for each remaining statement in the query closes https://github.com/influxdata/influxdb/issues/19136 --- CHANGELOG.md | 1 + query/execution_context.go | 1 - query/executor.go | 4 +-- query/executor_test.go | 67 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96bc95ff08..45a8a15610 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ - [#22090](https://github.com/influxdata/influxdb/pull/22090): fix: systemd service -- handle https, 40x, and block indefinitely - [#22195](https://github.com/influxdata/influxdb/pull/22195): fix: avoid compaction queue stats flutter - [#22283](https://github.com/influxdata/influxdb/pull/22283): fix: require database authorization to see continuous queries +- [#22273](https://github.com/influxdata/influxdb/pull/22273): fix: return correct count of ErrNotExecuted v1.9.2 [unreleased] - [#21631](https://github.com/influxdata/influxdb/pull/21631): fix: group by returns multiple results per group in some circumstances diff --git a/query/execution_context.go b/query/execution_context.go index 5479226f56..a17bbf7d99 100644 --- a/query/execution_context.go +++ b/query/execution_context.go @@ -91,7 +91,6 @@ func (ctx *ExecutionContext) Value(key interface{}) interface{} { // send sends a Result to the Results channel and will exit if the query has // been aborted. func (ctx *ExecutionContext) send(result *Result) error { - result.StatementID = ctx.statementID select { case <-ctx.AbortCh: return ErrQueryAborted diff --git a/query/executor.go b/query/executor.go index 75d4a7a320..7b0a28d27c 100644 --- a/query/executor.go +++ b/query/executor.go @@ -331,7 +331,7 @@ LOOP: // Normalize each statement if possible. if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil { - if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted { + if err := ctx.send(&Result{Err: err, StatementID: i}); err == ErrQueryAborted { return } break @@ -380,7 +380,7 @@ LOOP: } // Send error results for any statements which were not executed. - for ; i < len(query.Statements)-1; i++ { + for i++; i < len(query.Statements); i++ { if err := ctx.send(&Result{ StatementID: i, Err: ErrNotExecuted, diff --git a/query/executor_test.go b/query/executor_test.go index 8ed49fde87..b8feaa317f 100644 --- a/query/executor_test.go +++ b/query/executor_test.go @@ -21,6 +21,15 @@ func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt i return e.ExecuteStatementFn(stmt, ctx) } +type StatementNormalizerExecutor struct { + StatementExecutor + NormalizeStatementFn func(stmt influxql.Statement, database, retentionPolicy string) error +} + +func (e *StatementNormalizerExecutor) NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error { + return e.NormalizeStatementFn(stmt, database, retentionPolicy) +} + func NewQueryExecutor() *query.Executor { return query.NewExecutor() } @@ -478,6 +487,64 @@ func TestQueryExecutor_Panic(t *testing.T) { } } +const goodStatement = `SELECT count(value) FROM cpu` + +func TestQueryExecutor_NotExecuted(t *testing.T) { + var executorFailIndex int + var executorCallCount int + queryStatements := []string{goodStatement, goodStatement, goodStatement, goodStatement, goodStatement} + queryStr := strings.Join(queryStatements, ";") + var closing chan struct{} + + q, err := influxql.ParseQuery(queryStr) + if err != nil { + t.Fatalf("parsing %s: %v", queryStr, err) + } + + e := NewQueryExecutor() + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + defer func() { executorCallCount++ }() + if executorFailIndex == executorCallCount { + closing <- struct{}{} + close(closing) + select { + case <-ctx.Done(): + return nil + } + } else { + return ctx.Send(&query.Result{Err: nil}) + } + }, + } + testFn := func(testName string, i int) { + results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing) + checkNotExecutedResults(t, results, testName, i, len(q.Statements)) + } + for i := 0; i < len(q.Statements); i++ { + closing = make(chan struct{}) + executorFailIndex = i + executorCallCount = 0 + testFn("executor", i) + } +} + +func checkNotExecutedResults(t *testing.T, results <-chan *query.Result, testName string, failIndex int, lenQuery int) { + notExecutedIndex := failIndex + 1 + for result := range results { + if result.Err == query.ErrNotExecuted { + if result.StatementID != notExecutedIndex { + t.Fatalf("StatementID for ErrNotExecuted in wrong order - expected: %d, got: %d", notExecutedIndex, result.StatementID) + } else { + notExecutedIndex++ + } + } + } + if notExecutedIndex != lenQuery { + t.Fatalf("wrong number of results from %s with fail index of %d - got: %d, expected: %d", testName, failIndex, notExecutedIndex - (1 + failIndex), lenQuery-(1+failIndex)) + } +} + func TestQueryExecutor_InvalidSource(t *testing.T) { e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{