fix: return correct count of ErrNotExecuted (#22273)

executeQuery() iterates over statements until each is
processed or if an error is encountered that causes
the loop to exit pre-maturely. It should return
ErrNotExecuted for each remaining statement in the
query

closes https://github.com/influxdata/influxdb/issues/19136
pull/22333/head
davidby-influx 2021-08-24 11:27:10 -07:00 committed by GitHub
parent 27e5f9709e
commit 0090c5b111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 3 deletions

View File

@ -33,6 +33,7 @@
- [#22090](https://github.com/influxdata/influxdb/pull/22090): fix: systemd service -- handle https, 40x, and block indefinitely
- [#22195](https://github.com/influxdata/influxdb/pull/22195): fix: avoid compaction queue stats flutter
- [#22283](https://github.com/influxdata/influxdb/pull/22283): fix: require database authorization to see continuous queries
- [#22273](https://github.com/influxdata/influxdb/pull/22273): fix: return correct count of ErrNotExecuted
v1.9.2 [unreleased]
- [#21631](https://github.com/influxdata/influxdb/pull/21631): fix: group by returns multiple results per group in some circumstances

View File

@ -91,7 +91,6 @@ func (ctx *ExecutionContext) Value(key interface{}) interface{} {
// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
result.StatementID = ctx.statementID
select {
case <-ctx.AbortCh:
return ErrQueryAborted

View File

@ -331,7 +331,7 @@ LOOP:
// Normalize each statement if possible.
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil {
if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
if err := ctx.send(&Result{Err: err, StatementID: i}); err == ErrQueryAborted {
return
}
break
@ -380,7 +380,7 @@ LOOP:
}
// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
for i++; i < len(query.Statements); i++ {
if err := ctx.send(&Result{
StatementID: i,
Err: ErrNotExecuted,

View File

@ -21,6 +21,15 @@ func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt i
return e.ExecuteStatementFn(stmt, ctx)
}
type StatementNormalizerExecutor struct {
StatementExecutor
NormalizeStatementFn func(stmt influxql.Statement, database, retentionPolicy string) error
}
func (e *StatementNormalizerExecutor) NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error {
return e.NormalizeStatementFn(stmt, database, retentionPolicy)
}
func NewQueryExecutor() *query.Executor {
return query.NewExecutor()
}
@ -478,6 +487,64 @@ func TestQueryExecutor_Panic(t *testing.T) {
}
}
const goodStatement = `SELECT count(value) FROM cpu`
func TestQueryExecutor_NotExecuted(t *testing.T) {
var executorFailIndex int
var executorCallCount int
queryStatements := []string{goodStatement, goodStatement, goodStatement, goodStatement, goodStatement}
queryStr := strings.Join(queryStatements, ";")
var closing chan struct{}
q, err := influxql.ParseQuery(queryStr)
if err != nil {
t.Fatalf("parsing %s: %v", queryStr, err)
}
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
defer func() { executorCallCount++ }()
if executorFailIndex == executorCallCount {
closing <- struct{}{}
close(closing)
select {
case <-ctx.Done():
return nil
}
} else {
return ctx.Send(&query.Result{Err: nil})
}
},
}
testFn := func(testName string, i int) {
results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing)
checkNotExecutedResults(t, results, testName, i, len(q.Statements))
}
for i := 0; i < len(q.Statements); i++ {
closing = make(chan struct{})
executorFailIndex = i
executorCallCount = 0
testFn("executor", i)
}
}
func checkNotExecutedResults(t *testing.T, results <-chan *query.Result, testName string, failIndex int, lenQuery int) {
notExecutedIndex := failIndex + 1
for result := range results {
if result.Err == query.ErrNotExecuted {
if result.StatementID != notExecutedIndex {
t.Fatalf("StatementID for ErrNotExecuted in wrong order - expected: %d, got: %d", notExecutedIndex, result.StatementID)
} else {
notExecutedIndex++
}
}
}
if notExecutedIndex != lenQuery {
t.Fatalf("wrong number of results from %s with fail index of %d - got: %d, expected: %d", testName, failIndex, notExecutedIndex - (1 + failIndex), lenQuery-(1+failIndex))
}
}
func TestQueryExecutor_InvalidSource(t *testing.T) {
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{