From 64c2d704da7acbc302fb4ddc8c8bd9e941bc4b1e Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 8 Nov 2016 13:12:25 -0600 Subject: [PATCH] Avoid deadlock when max-row-limit is hit When the `max-row-limit` was hit, the goroutine reading from the results channel would stop reading from the channel, but it didn't signal to the sender that it was no longer reading from the results. This caused the sender to continue trying to send results even though nobody would ever read it and this created a deadlock. Include an `AbortCh` on the `ExecutionContext` that will signal when results are no longer desired so the sender can abort instead of deadlocking. --- CHANGELOG.md | 1 + coordinator/statement_executor.go | 48 ++++++++++++++----------------- influxql/query_executor.go | 47 +++++++++++++++++++++++++++--- influxql/query_executor_test.go | 35 ++++++++++++++++++++-- services/httpd/handler.go | 15 ++++++---- 5 files changed, 107 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b037914845..2205c69716 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ All Changes: - [#7548](https://github.com/influxdata/influxdb/issues/7548): Fix output duration units for SHOW QUERIES. - [#7564](https://github.com/influxdata/influxdb/issues/7564): Fix incorrect grouping when multiple aggregates are used with sparse data. - [#7448](https://github.com/influxdata/influxdb/pull/7448): Fix Retention Policy Inconsistencies +- [#7606](https://github.com/influxdata/influxdb/pull/7606): Avoid deadlock when `max-row-limit` is hit. ## v1.0.2 [2016-10-05] diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index c85e338a7e..aab3dc1b7a 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -187,12 +187,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx return err } - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: rows, Messages: messages, - } - return nil + }) } func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error { @@ -441,10 +440,8 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen } // Send results or exit if closing. - select { - case <-ctx.InterruptCh: - return influxql.ErrQueryInterrupted - case ctx.Results <- result: + if err := ctx.Send(result); err != nil { + return err } emitted = true @@ -461,7 +458,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) } - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Messages: messages, Series: []*models.Row{{ @@ -469,16 +466,15 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen Columns: []string{"time", "written"}, Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}}, }}, - } - return nil + }) } // Always emit at least one result. if !emitted { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: make([]*models.Row, 0), - } + }) } return nil @@ -673,11 +669,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea measurements, err := e.TSDBStore.Measurements(q.Database, q.Condition) if err != nil || len(measurements) == 0 { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Err: err, - } - return nil + }) } if q.Offset > 0 { @@ -700,21 +695,19 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } if len(values) == 0 { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, - } - return nil + }) } - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: []*models.Row{{ Name: "measurements", Columns: []string{"name"}, Values: values, }}, - } - return nil + }) } func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) { @@ -849,11 +842,10 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition) if err != nil { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Err: err, - } - return nil + }) } emitted := false @@ -887,18 +879,20 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem row.Values[i] = []interface{}{v.Key, v.Value} } - ctx.Results <- &influxql.Result{ + if err := ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, Series: []*models.Row{row}, + }); err != nil { + return err } emitted = true } // Ensure at least one result is emitted. if !emitted { - ctx.Results <- &influxql.Result{ + return ctx.Send(&influxql.Result{ StatementID: ctx.StatementID, - } + }) } return nil } diff --git a/influxql/query_executor.go b/influxql/query_executor.go index 54ee24dc61..c74278e797 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -25,6 +25,9 @@ var ( // ErrQueryInterrupted is an error returned when the query is interrupted. ErrQueryInterrupted = errors.New("query interrupted") + // ErrQueryAborted is an error returned when the query is aborted. + ErrQueryAborted = errors.New("query aborted") + // ErrQueryEngineShutdown is an error sent when the query cannot be // created because the query engine was shutdown. ErrQueryEngineShutdown = errors.New("query engine shutdown") @@ -74,6 +77,9 @@ type ExecutionOptions struct { // Quiet suppresses non-essential output from the query executor. Quiet bool + + // AbortCh is a channel that signals when results are no longer desired by the caller. + AbortCh <-chan struct{} } // ExecutionContext contains state that the query is currently executing with. @@ -100,6 +106,30 @@ type ExecutionContext struct { ExecutionOptions } +// send sends a Result to the Results channel and will exit if the query has +// been aborted. +func (ctx *ExecutionContext) send(result *Result) error { + select { + case <-ctx.AbortCh: + return ErrQueryAborted + case ctx.Results <- result: + } + return nil +} + +// Send sends a Result to the Results channel and will exit if the query has +// been interrupted or aborted. +func (ctx *ExecutionContext) Send(result *Result) error { + select { + case <-ctx.InterruptCh: + return ErrQueryInterrupted + case <-ctx.AbortCh: + return ErrQueryAborted + case ctx.Results <- result: + } + return nil +} + // StatementExecutor executes a statement within the QueryExecutor. type StatementExecutor interface { // ExecuteStatement executes a statement. Results should be sent to the @@ -194,7 +224,10 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) if err != nil { - results <- &Result{Err: err} + select { + case results <- &Result{Err: err}: + case <-opt.AbortCh: + } return } defer e.TaskManager.KillQuery(qid) @@ -265,7 +298,9 @@ LOOP: // Normalize each statement if possible. if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil { - results <- &Result{Err: err} + if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted { + return + } break } } @@ -287,9 +322,11 @@ LOOP: // Send an error for this result if it failed for some reason. if err != nil { - results <- &Result{ + if err := ctx.send(&Result{ StatementID: i, Err: err, + }); err == ErrQueryAborted { + return } // Stop after the first error. break @@ -313,9 +350,11 @@ LOOP: // Send error results for any statements which were not executed. for ; i < len(query.Statements)-1; i++ { - results <- &Result{ + if err := ctx.send(&Result{ StatementID: i, Err: ErrNotExecuted, + }); err == ErrQueryAborted { + return } } } diff --git a/influxql/query_executor_test.go b/influxql/query_executor_test.go index 2445a2152e..3cc643f11d 100644 --- a/influxql/query_executor_test.go +++ b/influxql/query_executor_test.go @@ -111,6 +111,37 @@ func TestQueryExecutor_Interrupt(t *testing.T) { } } +func TestQueryExecutor_Abort(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + if err != nil { + t.Fatal(err) + } + + ch1 := make(chan struct{}) + ch2 := make(chan struct{}) + + e := NewQueryExecutor() + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + <-ch1 + if err := ctx.Send(&influxql.Result{Err: errUnexpected}); err != influxql.ErrQueryAborted { + t.Errorf("unexpected error: %v", err) + } + close(ch2) + return nil + }, + } + + done := make(chan struct{}) + close(done) + + results := e.ExecuteQuery(q, influxql.ExecutionOptions{AbortCh: done}, nil) + close(ch1) + + <-ch2 + discardOutput(results) +} + func TestQueryExecutor_ShowQueries(t *testing.T) { q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { @@ -225,7 +256,6 @@ func TestQueryExecutor_Close(t *testing.T) { ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { close(ch1) <-ctx.InterruptCh - close(ch2) return influxql.ErrQueryInterrupted }, } @@ -236,6 +266,7 @@ func TestQueryExecutor_Close(t *testing.T) { if result.Err != influxql.ErrQueryEngineShutdown { t.Errorf("unexpected error: %s", result.Err) } + close(ch2) }(results) // Wait for the statement to start executing. @@ -248,7 +279,7 @@ func TestQueryExecutor_Close(t *testing.T) { select { case <-ch2: case <-time.After(100 * time.Millisecond): - t.Error("closing the query manager did not kill the query after 100 milliseconds") + t.Fatal("closing the query manager did not kill the query after 100 milliseconds") } results = e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil) diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 354b1a82d0..bf4c7a2f38 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -377,6 +377,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Parse whether this is an async command. async := r.FormValue("async") == "true" + opts := influxql.ExecutionOptions{ + Database: db, + ChunkSize: chunkSize, + ReadOnly: r.Method == "GET", + NodeID: nodeID, + } + // Make sure if the client disconnects we signal the query to abort var closing chan struct{} if !async { @@ -398,6 +405,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. close(closing) } }() + opts.AbortCh = done } else { defer close(closing) } @@ -405,12 +413,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Execute query. rw.Header().Add("Connection", "close") - results := h.QueryExecutor.ExecuteQuery(query, influxql.ExecutionOptions{ - Database: db, - ChunkSize: chunkSize, - ReadOnly: r.Method == "GET", - NodeID: nodeID, - }, closing) + results := h.QueryExecutor.ExecuteQuery(query, opts, closing) // If we are running in async mode, open a goroutine to drain the results // and return with a StatusNoContent.