Merge pull request #9020 from influxdata/js-9018-panic-on-task-manager-close

Do not panic when the task manager closes if a killed query is still running
pull/8893/head
Jonathan A. Sternberg 2017-10-26 12:10:57 -05:00 committed by GitHub
commit 11c62eb9ab
3 changed files with 130 additions and 10 deletions

View File

@ -36,6 +36,9 @@ var (
// ErrQueryTimeoutLimitExceeded is an error when a query hits the max time allowed to run.
ErrQueryTimeoutLimitExceeded = errors.New("query-timeout limit exceeded")
// ErrAlreadyKilled is returned when attempting to kill a query that has already been killed.
ErrAlreadyKilled = errors.New("already killed")
)
// Statistics for the QueryExecutor
@ -500,3 +503,24 @@ func (q *QueryTask) monitor(fn QueryMonitorFunc) {
}
}
}
// close closes the query task closing channel if the query hasn't been previously killed.
func (q *QueryTask) close() {
q.mu.Lock()
if q.status != KilledTask {
close(q.closing)
}
q.mu.Unlock()
}
func (q *QueryTask) kill() error {
q.mu.Lock()
if q.status == KilledTask {
q.mu.Unlock()
return ErrAlreadyKilled
}
q.status = KilledTask
close(q.closing)
q.mu.Unlock()
return nil
}

View File

@ -148,6 +148,107 @@ func TestQueryExecutor_KillQuery_Zombie(t *testing.T) {
}
}
func TestQueryExecutor_KillQuery_CloseTaskManager(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qid := make(chan uint64)
// Open a channel to stall the statement executor forever. This keeps the statement executor
// running even after we kill the query which can happen with some queries. We only close it once
// the test has finished running.
done := make(chan struct{})
defer close(done)
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
switch stmt.(type) {
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx)
}
qid <- ctx.QueryID
<-done
return nil
},
}
// Kill the query. This should switch it into a zombie state.
go discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid))
if err != nil {
t.Fatal(err)
}
discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
// Display the queries and ensure that the original is still in there.
q, err = influxql.ParseQuery("SHOW QUERIES")
if err != nil {
t.Fatal(err)
}
tasks := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
// The killed query should still be there.
task := <-tasks
if len(task.Series) != 1 {
t.Errorf("expected %d series, got %d", 1, len(task.Series))
} else if len(task.Series[0].Values) != 2 {
t.Errorf("expected %d rows, got %d", 2, len(task.Series[0].Values))
}
// Close the task manager to ensure it doesn't cause a panic.
if err := e.TaskManager.Close(); err != nil {
t.Errorf("unexpected error: %s", err)
}
}
func TestQueryExecutor_KillQuery_AlreadyKilled(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qid := make(chan uint64)
// Open a channel to stall the statement executor forever. This keeps the statement executor
// running even after we kill the query which can happen with some queries. We only close it once
// the test has finished running.
done := make(chan struct{})
defer close(done)
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
switch stmt.(type) {
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx)
}
qid <- ctx.QueryID
<-done
return nil
},
}
// Kill the query. This should switch it into a zombie state.
go discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid))
if err != nil {
t.Fatal(err)
}
discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
// Now attempt to kill it again. We should get an error.
results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
result := <-results
if got, want := result.Err, query.ErrAlreadyKilled; got != want {
t.Errorf("unexpected error: got=%v want=%v", got, want)
}
}
func TestQueryExecutor_Interrupt(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {

View File

@ -197,16 +197,13 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt
// running query.
func (t *TaskManager) KillQuery(qid uint64) error {
t.mu.Lock()
defer t.mu.Unlock()
query := t.queries[qid]
t.mu.Unlock()
if query == nil {
return fmt.Errorf("no such query id: %d", qid)
}
close(query.closing)
query.status = KilledTask
return nil
return query.kill()
}
// DetachQuery removes a query from the query table. If the query is not in the
@ -220,9 +217,7 @@ func (t *TaskManager) DetachQuery(qid uint64) error {
return fmt.Errorf("no such query id: %d", qid)
}
if query.status != KilledTask {
close(query.closing)
}
query.close()
delete(t.queries, qid)
return nil
}
@ -287,7 +282,7 @@ func (t *TaskManager) Close() error {
t.shutdown = true
for _, query := range t.queries {
query.setError(ErrQueryEngineShutdown)
close(query.closing)
query.close()
}
t.queries = nil
return nil