diff --git a/CHANGELOG.md b/CHANGELOG.md index b037914845..2205c69716 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ All Changes: - [#7548](https://github.com/influxdata/influxdb/issues/7548): Fix output duration units for SHOW QUERIES. - [#7564](https://github.com/influxdata/influxdb/issues/7564): Fix incorrect grouping when multiple aggregates are used with sparse data. - [#7448](https://github.com/influxdata/influxdb/pull/7448): Fix Retention Policy Inconsistencies +- [#7606](https://github.com/influxdata/influxdb/pull/7606): Avoid deadlock when `max-row-limit` is hit. ## v1.0.2 [2016-10-05] diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index c85e338a7e..aab3dc1b7a 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -187,12 +187,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx return err } - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: rows, Messages: messages, - } - return nil + }) } func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error { @@ -441,10 +440,8 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen } // Send results or exit if closing. - select { - case <-ctx.InterruptCh: - return influxql.ErrQueryInterrupted - case ctx.Results <- result: + if err := ctx.Send(result); err != nil { + return err } emitted = true @@ -461,7 +458,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) } - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Messages: messages, Series: []*models.Row{{ @@ -469,16 +466,15 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen Columns: []string{"time", "written"}, Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}}, }}, - } - return nil + }) } // Always emit at least one result. if !emitted { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: make([]*models.Row, 0), - } + }) } return nil @@ -673,11 +669,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea measurements, err := e.TSDBStore.Measurements(q.Database, q.Condition) if err != nil || len(measurements) == 0 { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Err: err, - } - return nil + }) } if q.Offset > 0 { @@ -700,21 +695,19 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } if len(values) == 0 { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, - } - return nil + }) } - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: []*models.Row{{ Name: "measurements", Columns: []string{"name"}, Values: values, }}, - } - return nil + }) } func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) { @@ -849,11 +842,10 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition) if err != nil { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Err: err, - } - return nil + }) } emitted := false @@ -887,18 +879,20 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem row.Values[i] = []interface{}{v.Key, v.Value} } - ctx.Results <- &influxql.Result{ + if err := ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: []*models.Row{row}, + }); err != nil { + return err } emitted = true } // Ensure at least one result is emitted. if !emitted { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, - } + }) } return nil } diff --git a/influxql/query_executor.go b/influxql/query_executor.go index 54ee24dc61..c74278e797 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -25,6 +25,9 @@ var ( // ErrQueryInterrupted is an error returned when the query is interrupted. ErrQueryInterrupted = errors.New("query interrupted") + // ErrQueryAborted is an error returned when the query is aborted. + ErrQueryAborted = errors.New("query aborted") + // ErrQueryEngineShutdown is an error sent when the query cannot be // created because the query engine was shutdown. ErrQueryEngineShutdown = errors.New("query engine shutdown") @@ -74,6 +77,9 @@ type ExecutionOptions struct { // Quiet suppresses non-essential output from the query executor. Quiet bool + + // AbortCh is a channel that signals when results are no longer desired by the caller. + AbortCh <-chan struct{} } // ExecutionContext contains state that the query is currently executing with. @@ -100,6 +106,30 @@ type ExecutionContext struct { ExecutionOptions } +// send sends a Result to the Results channel and will exit if the query has +// been aborted. +func (ctx *ExecutionContext) send(result *Result) error { + select { + case <-ctx.AbortCh: + return ErrQueryAborted + case ctx.Results <- result: + } + return nil +} + +// Send sends a Result to the Results channel and will exit if the query has +// been interrupted or aborted. +func (ctx *ExecutionContext) Send(result *Result) error { + select { + case <-ctx.InterruptCh: + return ErrQueryInterrupted + case <-ctx.AbortCh: + return ErrQueryAborted + case ctx.Results <- result: + } + return nil +} + // StatementExecutor executes a statement within the QueryExecutor. type StatementExecutor interface { // ExecuteStatement executes a statement. Results should be sent to the @@ -194,7 +224,10 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) if err != nil { - results <- &Result{Err: err} + select { + case results <- &Result{Err: err}: + case <-opt.AbortCh: + } return } defer e.TaskManager.KillQuery(qid) @@ -265,7 +298,9 @@ LOOP: // Normalize each statement if possible. if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil { - results <- &Result{Err: err} + if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted { + return + } break } } @@ -287,9 +322,11 @@ LOOP: // Send an error for this result if it failed for some reason. if err != nil { - results <- &Result{ + if err := ctx.send(&Result{ StatementID: i, Err: err, + }); err == ErrQueryAborted { + return } // Stop after the first error. break @@ -313,9 +350,11 @@ LOOP: // Send error results for any statements which were not executed. for ; i < len(query.Statements)-1; i++ { - results <- &Result{ + if err := ctx.send(&Result{ StatementID: i, Err: ErrNotExecuted, + }); err == ErrQueryAborted { + return } } } diff --git a/influxql/query_executor_test.go b/influxql/query_executor_test.go index 2445a2152e..3cc643f11d 100644 --- a/influxql/query_executor_test.go +++ b/influxql/query_executor_test.go @@ -111,6 +111,37 @@ func TestQueryExecutor_Interrupt(t *testing.T) { } } +func TestQueryExecutor_Abort(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + if err != nil { + t.Fatal(err) + } + + ch1 := make(chan struct{}) + ch2 := make(chan struct{}) + + e := NewQueryExecutor() + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + <-ch1 + if err := ctx.Send(&influxql.Result{Err: errUnexpected}); err != influxql.ErrQueryAborted { + t.Errorf("unexpected error: %v", err) + } + close(ch2) + return nil + }, + } + + done := make(chan struct{}) + close(done) + + results := e.ExecuteQuery(q, influxql.ExecutionOptions{AbortCh: done}, nil) + close(ch1) + + <-ch2 + discardOutput(results) +} + func TestQueryExecutor_ShowQueries(t *testing.T) { q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { @@ -225,7 +256,6 @@ func TestQueryExecutor_Close(t *testing.T) { ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { close(ch1) <-ctx.InterruptCh - close(ch2) return influxql.ErrQueryInterrupted }, } @@ -236,6 +266,7 @@ func TestQueryExecutor_Close(t *testing.T) { if result.Err != influxql.ErrQueryEngineShutdown { t.Errorf("unexpected error: %s", result.Err) } + close(ch2) }(results) // Wait for the statement to start executing. @@ -248,7 +279,7 @@ func TestQueryExecutor_Close(t *testing.T) { select { case <-ch2: case <-time.After(100 * time.Millisecond): - t.Error("closing the query manager did not kill the query after 100 milliseconds") + t.Fatal("closing the query manager did not kill the query after 100 milliseconds") } results = e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil) diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 354b1a82d0..bf4c7a2f38 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -377,6 +377,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Parse whether this is an async command. async := r.FormValue("async") == "true" + opts := influxql.ExecutionOptions{ + Database: db, + ChunkSize: chunkSize, + ReadOnly: r.Method == "GET", + NodeID: nodeID, + } + // Make sure if the client disconnects we signal the query to abort var closing chan struct{} if !async { @@ -398,6 +405,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. close(closing) } }() + opts.AbortCh = done } else { defer close(closing) } @@ -405,12 +413,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Execute query. rw.Header().Add("Connection", "close") - results := h.QueryExecutor.ExecuteQuery(query, influxql.ExecutionOptions{ - Database: db, - ChunkSize: chunkSize, - ReadOnly: r.Method == "GET", - NodeID: nodeID, - }, closing) + results := h.QueryExecutor.ExecuteQuery(query, opts, closing) // If we are running in async mode, open a goroutine to drain the results // and return with a StatusNoContent.