Avoid deadlock when max-row-limit is hit
When the `max-row-limit` was hit, the goroutine reading from the results channel would stop reading from the channel, but it didn't signal to the sender that it was no longer reading from the results. This caused the sender to continue trying to send results even though nobody would ever read it and this created a deadlock. Include an `AbortCh` on the `ExecutionContext` that will signal when results are no longer desired so the sender can abort instead of deadlocking.pull/7608/head
parent
6ffe164eac
commit
b87116449c
|
@ -88,6 +88,7 @@ All Changes:
|
|||
- [#7526](https://github.com/influxdata/influxdb/issues/7526): Truncate the version string when linking to the documentation.
|
||||
- [#7548](https://github.com/influxdata/influxdb/issues/7548): Fix output duration units for SHOW QUERIES.
|
||||
- [#7564](https://github.com/influxdata/influxdb/issues/7564): Fix incorrect grouping when multiple aggregates are used with sparse data.
|
||||
- [#7606](https://github.com/influxdata/influxdb/pull/7606): Avoid deadlock when `max-row-limit` is hit.
|
||||
|
||||
## v1.0.2 [2016-10-05]
|
||||
|
||||
|
|
|
@ -187,12 +187,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx
|
|||
return err
|
||||
}
|
||||
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: rows,
|
||||
Messages: messages,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
|
||||
|
@ -455,10 +454,8 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
|
|||
}
|
||||
|
||||
// Send results or exit if closing.
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
return influxql.ErrQueryInterrupted
|
||||
case ctx.Results <- result:
|
||||
if err := ctx.Send(result); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
emitted = true
|
||||
|
@ -475,7 +472,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
|
|||
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
||||
}
|
||||
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Messages: messages,
|
||||
Series: []*models.Row{{
|
||||
|
@ -483,16 +480,15 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
|
|||
Columns: []string{"time", "written"},
|
||||
Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},
|
||||
}},
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Always emit at least one result.
|
||||
if !emitted {
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: make([]*models.Row, 0),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -687,11 +683,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
|
|||
|
||||
measurements, err := e.TSDBStore.Measurements(q.Database, q.Condition)
|
||||
if err != nil || len(measurements) == 0 {
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Err: err,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if q.Offset > 0 {
|
||||
|
@ -714,21 +709,19 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
|
|||
}
|
||||
|
||||
if len(values) == 0 {
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: []*models.Row{{
|
||||
Name: "measurements",
|
||||
Columns: []string{"name"},
|
||||
Values: values,
|
||||
}},
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
|
||||
|
@ -863,11 +856,10 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
|
|||
|
||||
tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition)
|
||||
if err != nil {
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Err: err,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
emitted := false
|
||||
|
@ -901,18 +893,20 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
|
|||
row.Values[i] = []interface{}{v.Key, v.Value}
|
||||
}
|
||||
|
||||
ctx.Results <- &influxql.Result{
|
||||
if err := ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: []*models.Row{row},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
emitted = true
|
||||
}
|
||||
|
||||
// Ensure at least one result is emitted.
|
||||
if !emitted {
|
||||
ctx.Results <- &influxql.Result{
|
||||
return ctx.Send(&influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
}
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -25,6 +25,9 @@ var (
|
|||
// ErrQueryInterrupted is an error returned when the query is interrupted.
|
||||
ErrQueryInterrupted = errors.New("query interrupted")
|
||||
|
||||
// ErrQueryAborted is an error returned when the query is aborted.
|
||||
ErrQueryAborted = errors.New("query aborted")
|
||||
|
||||
// ErrQueryEngineShutdown is an error sent when the query cannot be
|
||||
// created because the query engine was shutdown.
|
||||
ErrQueryEngineShutdown = errors.New("query engine shutdown")
|
||||
|
@ -74,6 +77,9 @@ type ExecutionOptions struct {
|
|||
|
||||
// Quiet suppresses non-essential output from the query executor.
|
||||
Quiet bool
|
||||
|
||||
// AbortCh is a channel that signals when results are no longer desired by the caller.
|
||||
AbortCh <-chan struct{}
|
||||
}
|
||||
|
||||
// ExecutionContext contains state that the query is currently executing with.
|
||||
|
@ -100,6 +106,30 @@ type ExecutionContext struct {
|
|||
ExecutionOptions
|
||||
}
|
||||
|
||||
// send sends a Result to the Results channel and will exit if the query has
|
||||
// been aborted.
|
||||
func (ctx *ExecutionContext) send(result *Result) error {
|
||||
select {
|
||||
case <-ctx.AbortCh:
|
||||
return ErrQueryAborted
|
||||
case ctx.Results <- result:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send sends a Result to the Results channel and will exit if the query has
|
||||
// been interrupted or aborted.
|
||||
func (ctx *ExecutionContext) Send(result *Result) error {
|
||||
select {
|
||||
case <-ctx.InterruptCh:
|
||||
return ErrQueryInterrupted
|
||||
case <-ctx.AbortCh:
|
||||
return ErrQueryAborted
|
||||
case ctx.Results <- result:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StatementExecutor executes a statement within the QueryExecutor.
|
||||
type StatementExecutor interface {
|
||||
// ExecuteStatement executes a statement. Results should be sent to the
|
||||
|
@ -194,7 +224,10 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
|
|||
|
||||
qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing)
|
||||
if err != nil {
|
||||
results <- &Result{Err: err}
|
||||
select {
|
||||
case results <- &Result{Err: err}:
|
||||
case <-opt.AbortCh:
|
||||
}
|
||||
return
|
||||
}
|
||||
defer e.TaskManager.KillQuery(qid)
|
||||
|
@ -265,7 +298,9 @@ LOOP:
|
|||
// Normalize each statement if possible.
|
||||
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
|
||||
if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil {
|
||||
results <- &Result{Err: err}
|
||||
if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
|
||||
return
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -287,9 +322,11 @@ LOOP:
|
|||
|
||||
// Send an error for this result if it failed for some reason.
|
||||
if err != nil {
|
||||
results <- &Result{
|
||||
if err := ctx.send(&Result{
|
||||
StatementID: i,
|
||||
Err: err,
|
||||
}); err == ErrQueryAborted {
|
||||
return
|
||||
}
|
||||
// Stop after the first error.
|
||||
break
|
||||
|
@ -313,9 +350,11 @@ LOOP:
|
|||
|
||||
// Send error results for any statements which were not executed.
|
||||
for ; i < len(query.Statements)-1; i++ {
|
||||
results <- &Result{
|
||||
if err := ctx.send(&Result{
|
||||
StatementID: i,
|
||||
Err: ErrNotExecuted,
|
||||
}); err == ErrQueryAborted {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,6 +111,37 @@ func TestQueryExecutor_Interrupt(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestQueryExecutor_Abort(t *testing.T) {
|
||||
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ch1 := make(chan struct{})
|
||||
ch2 := make(chan struct{})
|
||||
|
||||
e := NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{
|
||||
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
|
||||
<-ch1
|
||||
if err := ctx.Send(&influxql.Result{Err: errUnexpected}); err != influxql.ErrQueryAborted {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
close(ch2)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
close(done)
|
||||
|
||||
results := e.ExecuteQuery(q, influxql.ExecutionOptions{AbortCh: done}, nil)
|
||||
close(ch1)
|
||||
|
||||
<-ch2
|
||||
discardOutput(results)
|
||||
}
|
||||
|
||||
func TestQueryExecutor_ShowQueries(t *testing.T) {
|
||||
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
||||
if err != nil {
|
||||
|
@ -225,7 +256,6 @@ func TestQueryExecutor_Close(t *testing.T) {
|
|||
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
|
||||
close(ch1)
|
||||
<-ctx.InterruptCh
|
||||
close(ch2)
|
||||
return influxql.ErrQueryInterrupted
|
||||
},
|
||||
}
|
||||
|
@ -236,6 +266,7 @@ func TestQueryExecutor_Close(t *testing.T) {
|
|||
if result.Err != influxql.ErrQueryEngineShutdown {
|
||||
t.Errorf("unexpected error: %s", result.Err)
|
||||
}
|
||||
close(ch2)
|
||||
}(results)
|
||||
|
||||
// Wait for the statement to start executing.
|
||||
|
@ -248,7 +279,7 @@ func TestQueryExecutor_Close(t *testing.T) {
|
|||
select {
|
||||
case <-ch2:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("closing the query manager did not kill the query after 100 milliseconds")
|
||||
t.Fatal("closing the query manager did not kill the query after 100 milliseconds")
|
||||
}
|
||||
|
||||
results = e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil)
|
||||
|
|
|
@ -377,6 +377,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
// Parse whether this is an async command.
|
||||
async := r.FormValue("async") == "true"
|
||||
|
||||
opts := influxql.ExecutionOptions{
|
||||
Database: db,
|
||||
ChunkSize: chunkSize,
|
||||
ReadOnly: r.Method == "GET",
|
||||
NodeID: nodeID,
|
||||
}
|
||||
|
||||
// Make sure if the client disconnects we signal the query to abort
|
||||
var closing chan struct{}
|
||||
if !async {
|
||||
|
@ -398,6 +405,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
close(closing)
|
||||
}
|
||||
}()
|
||||
opts.AbortCh = done
|
||||
} else {
|
||||
defer close(closing)
|
||||
}
|
||||
|
@ -405,12 +413,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
|
||||
// Execute query.
|
||||
rw.Header().Add("Connection", "close")
|
||||
results := h.QueryExecutor.ExecuteQuery(query, influxql.ExecutionOptions{
|
||||
Database: db,
|
||||
ChunkSize: chunkSize,
|
||||
ReadOnly: r.Method == "GET",
|
||||
NodeID: nodeID,
|
||||
}, closing)
|
||||
results := h.QueryExecutor.ExecuteQuery(query, opts, closing)
|
||||
|
||||
// If we are running in async mode, open a goroutine to drain the results
|
||||
// and return with a StatusNoContent.
|
||||
|
|
Loading…
Reference in New Issue