Merge pull request #7606 from influxdata/js-max-row-limit-block-fix

Avoid deadlock when max-row-limit is hit
pull/7616/head
Jonathan A. Sternberg 2016-11-08 14:39:46 -06:00 committed by GitHub
commit ee4d04de4e
5 changed files with 107 additions and 39 deletions

View File

@ -100,6 +100,7 @@ All Changes:
- [#7548](https://github.com/influxdata/influxdb/issues/7548): Fix output duration units for SHOW QUERIES.
- [#7564](https://github.com/influxdata/influxdb/issues/7564): Fix incorrect grouping when multiple aggregates are used with sparse data.
- [#7448](https://github.com/influxdata/influxdb/pull/7448): Fix Retention Policy Inconsistencies
- [#7606](https://github.com/influxdata/influxdb/pull/7606): Avoid deadlock when `max-row-limit` is hit.
## v1.0.2 [2016-10-05]

View File

@ -187,12 +187,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx
return err
}
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: rows,
Messages: messages,
}
return nil
})
}
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
@ -441,10 +440,8 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}
// Send results or exit if closing.
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case ctx.Results <- result:
if err := ctx.Send(result); err != nil {
return err
}
emitted = true
@ -461,7 +458,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Messages: messages,
Series: []*models.Row{{
@ -469,16 +466,15 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
Columns: []string{"time", "written"},
Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},
}},
}
return nil
})
}
// Always emit at least one result.
if !emitted {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: make([]*models.Row, 0),
}
})
}
return nil
@ -673,11 +669,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
measurements, err := e.TSDBStore.Measurements(q.Database, q.Condition)
if err != nil || len(measurements) == 0 {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Err: err,
}
return nil
})
}
if q.Offset > 0 {
@ -700,21 +695,19 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
}
if len(values) == 0 {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
}
return nil
})
}
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{{
Name: "measurements",
Columns: []string{"name"},
Values: values,
}},
}
return nil
})
}
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
@ -849,11 +842,10 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition)
if err != nil {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Err: err,
}
return nil
})
}
emitted := false
@ -887,18 +879,20 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
row.Values[i] = []interface{}{v.Key, v.Value}
}
ctx.Results <- &influxql.Result{
if err := ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{row},
}); err != nil {
return err
}
emitted = true
}
// Ensure at least one result is emitted.
if !emitted {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
}
})
}
return nil
}

View File

@ -25,6 +25,9 @@ var (
// ErrQueryInterrupted is an error returned when the query is interrupted.
ErrQueryInterrupted = errors.New("query interrupted")
// ErrQueryAborted is an error returned when the query is aborted.
ErrQueryAborted = errors.New("query aborted")
// ErrQueryEngineShutdown is an error sent when the query cannot be
// created because the query engine was shutdown.
ErrQueryEngineShutdown = errors.New("query engine shutdown")
@ -74,6 +77,9 @@ type ExecutionOptions struct {
// Quiet suppresses non-essential output from the query executor.
Quiet bool
// AbortCh is a channel that signals when results are no longer desired by the caller.
AbortCh <-chan struct{}
}
// ExecutionContext contains state that the query is currently executing with.
@ -100,6 +106,30 @@ type ExecutionContext struct {
ExecutionOptions
}
// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
select {
case <-ctx.AbortCh:
return ErrQueryAborted
case ctx.Results <- result:
}
return nil
}
// Send sends a Result to the Results channel and will exit if the query has
// been interrupted or aborted.
func (ctx *ExecutionContext) Send(result *Result) error {
select {
case <-ctx.InterruptCh:
return ErrQueryInterrupted
case <-ctx.AbortCh:
return ErrQueryAborted
case ctx.Results <- result:
}
return nil
}
// StatementExecutor executes a statement within the QueryExecutor.
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the
@ -194,7 +224,10 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing)
if err != nil {
results <- &Result{Err: err}
select {
case results <- &Result{Err: err}:
case <-opt.AbortCh:
}
return
}
defer e.TaskManager.KillQuery(qid)
@ -265,7 +298,9 @@ LOOP:
// Normalize each statement if possible.
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil {
results <- &Result{Err: err}
if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
return
}
break
}
}
@ -287,9 +322,11 @@ LOOP:
// Send an error for this result if it failed for some reason.
if err != nil {
results <- &Result{
if err := ctx.send(&Result{
StatementID: i,
Err: err,
}); err == ErrQueryAborted {
return
}
// Stop after the first error.
break
@ -313,9 +350,11 @@ LOOP:
// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
results <- &Result{
if err := ctx.send(&Result{
StatementID: i,
Err: ErrNotExecuted,
}); err == ErrQueryAborted {
return
}
}
}

View File

@ -111,6 +111,37 @@ func TestQueryExecutor_Interrupt(t *testing.T) {
}
}
func TestQueryExecutor_Abort(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
ch1 := make(chan struct{})
ch2 := make(chan struct{})
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
<-ch1
if err := ctx.Send(&influxql.Result{Err: errUnexpected}); err != influxql.ErrQueryAborted {
t.Errorf("unexpected error: %v", err)
}
close(ch2)
return nil
},
}
done := make(chan struct{})
close(done)
results := e.ExecuteQuery(q, influxql.ExecutionOptions{AbortCh: done}, nil)
close(ch1)
<-ch2
discardOutput(results)
}
func TestQueryExecutor_ShowQueries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
@ -225,7 +256,6 @@ func TestQueryExecutor_Close(t *testing.T) {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
close(ch1)
<-ctx.InterruptCh
close(ch2)
return influxql.ErrQueryInterrupted
},
}
@ -236,6 +266,7 @@ func TestQueryExecutor_Close(t *testing.T) {
if result.Err != influxql.ErrQueryEngineShutdown {
t.Errorf("unexpected error: %s", result.Err)
}
close(ch2)
}(results)
// Wait for the statement to start executing.
@ -248,7 +279,7 @@ func TestQueryExecutor_Close(t *testing.T) {
select {
case <-ch2:
case <-time.After(100 * time.Millisecond):
t.Error("closing the query manager did not kill the query after 100 milliseconds")
t.Fatal("closing the query manager did not kill the query after 100 milliseconds")
}
results = e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil)

View File

@ -377,6 +377,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// Parse whether this is an async command.
async := r.FormValue("async") == "true"
opts := influxql.ExecutionOptions{
Database: db,
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
}
// Make sure if the client disconnects we signal the query to abort
var closing chan struct{}
if !async {
@ -398,6 +405,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
close(closing)
}
}()
opts.AbortCh = done
} else {
defer close(closing)
}
@ -405,12 +413,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// Execute query.
rw.Header().Add("Connection", "close")
results := h.QueryExecutor.ExecuteQuery(query, influxql.ExecutionOptions{
Database: db,
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
}, closing)
results := h.QueryExecutor.ExecuteQuery(query, opts, closing)
// If we are running in async mode, open a goroutine to drain the results
// and return with a StatusNoContent.