Merge pull request #8854 from influxdata/js-report-task-killed-status

Report the task status for a query
pull/8853/head^2
Jonathan A. Sternberg 2017-09-19 15:22:47 -05:00 committed by GitHub
commit 6b1c804815
4 changed files with 115 additions and 6 deletions

View File

@ -27,6 +27,7 @@
- [#8791](https://github.com/influxdata/influxdb/pull/8791): Include the number of scanned cached values in the iterator cost.
- [#8784](https://github.com/influxdata/influxdb/pull/8784): Add support for the Prometheus remote read and write APIs.
- [#8851](https://github.com/influxdata/influxdb/pull/8851): Improve performance of `Include` and `Exclude` functions
- [#8854](https://github.com/influxdata/influxdb/pull/8854): Report the task status for a query.
### Bugfixes

View File

@ -273,7 +273,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions
}
return
}
defer e.TaskManager.KillQuery(qid)
defer e.TaskManager.DetachQuery(qid)
// Setup the execution context that will be used when executing statements.
ctx := ExecutionContext{
@ -441,6 +441,7 @@ type QueryMonitorFunc func(<-chan struct{}) error
type QueryTask struct {
query string
database string
status TaskStatus
startTime time.Time
closing chan struct{}
monitorCh chan error

View File

@ -84,6 +84,70 @@ func TestQueryExecutor_KillQuery(t *testing.T) {
}
}
func TestQueryExecutor_KillQuery_Zombie(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qid := make(chan uint64)
done := make(chan struct{})
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
switch stmt.(type) {
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx)
}
qid <- ctx.QueryID
select {
case <-ctx.InterruptCh:
select {
case <-done:
// Keep the query running until we run SHOW QUERIES.
case <-time.After(100 * time.Millisecond):
// Ensure that we don't have a lingering goroutine.
}
return query.ErrQueryInterrupted
case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds")
return errUnexpected
}
},
}
results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid))
if err != nil {
t.Fatal(err)
}
discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))
// Display the queries and ensure that the original is still in there.
q, err = influxql.ParseQuery("SHOW QUERIES")
if err != nil {
t.Fatal(err)
}
tasks := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
// The killed query should still be there.
task := <-tasks
if len(task.Series) != 1 {
t.Errorf("expected %d series, got %d", 1, len(task.Series))
} else if len(task.Series[0].Values) != 2 {
t.Errorf("expected %d rows, got %d", 2, len(task.Series[0].Values))
}
close(done)
// The original query should return.
result := <-results
if result.Err != query.ErrQueryInterrupted {
t.Errorf("unexpected error: %s", result.Err)
}
}
func TestQueryExecutor_Interrupt(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
@ -165,7 +229,9 @@ func TestQueryExecutor_ShowQueries(t *testing.T) {
results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
result := <-results
if len(result.Series) != 1 {
t.Errorf("expected %d rows, got %d", 1, len(result.Series))
t.Errorf("expected %d series, got %d", 1, len(result.Series))
} else if len(result.Series[0].Values) != 1 {
t.Errorf("expected %d row, got %d", 1, len(result.Series[0].Values))
}
if result.Err != nil {
t.Errorf("unexpected error: %s", result.Err)

View File

@ -16,6 +16,27 @@ const (
DefaultQueryTimeout = time.Duration(0)
)
type TaskStatus int
const (
// RunningTask is set when the task is running.
RunningTask TaskStatus = iota
// KilledTask is set when the task is killed, but resources are still
// being used.
KilledTask
)
func (t TaskStatus) String() string {
switch t {
case RunningTask:
return "running"
case KilledTask:
return "killed"
}
panic(fmt.Sprintf("unknown task status: %d", int(t)))
}
// TaskManager takes care of all aspects related to managing running queries.
type TaskManager struct {
// Query execution timeout.
@ -104,11 +125,11 @@ func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStateme
d = d - (d % time.Microsecond)
}
values = append(values, []interface{}{id, qi.query, qi.database, d.String()})
values = append(values, []interface{}{id, qi.query, qi.database, d.String(), qi.status.String()})
}
return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration"},
Columns: []string{"qid", "query", "database", "duration", "status"},
Values: values,
}}, nil
}
@ -145,6 +166,7 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt
query := &QueryTask{
query: q.String(),
database: database,
status: RunningTask,
startTime: time.Now(),
closing: make(chan struct{}),
monitorCh: make(chan error),
@ -170,8 +192,9 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt
return qid, query, nil
}
// KillQuery stops and removes a query from the TaskManager.
// This method can be used to forcefully terminate a running query.
// KillQuery enters a query into the killed state and closes the channel
// from the TaskManager. This method can be used to forcefully terminate a
// running query.
func (t *TaskManager) KillQuery(qid uint64) error {
t.mu.Lock()
defer t.mu.Unlock()
@ -182,6 +205,24 @@ func (t *TaskManager) KillQuery(qid uint64) error {
}
close(query.closing)
query.status = KilledTask
return nil
}
// DetachQuery removes a query from the query table. If the query is not in the
// killed state, this will also close the related channel.
func (t *TaskManager) DetachQuery(qid uint64) error {
t.mu.Lock()
defer t.mu.Unlock()
query := t.queries[qid]
if query == nil {
return fmt.Errorf("no such query id: %d", qid)
}
if query.status != KilledTask {
close(query.closing)
}
delete(t.queries, qid)
return nil
}