Implement a simple task manager for queries

The currently running queries can be listed with the command
`SHOW QUERIES` and it will display the current commands that have been
run, the database they were run against, and how long they have been
running.
pull/5950/head
Jonathan A. Sternberg 2016-03-08 13:54:32 -05:00
parent d61d75f55d
commit 117f62c33e
7 changed files with 221 additions and 3 deletions

View File

@ -36,6 +36,9 @@ type QueryExecutor struct {
// Used for rewriting points back into system for SELECT INTO statements.
PointsWriter *PointsWriter
// Used for managing and tracking running queries.
QueryManager influxql.QueryManager
// Remote execution timeout
Timeout time.Duration
@ -69,7 +72,7 @@ func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
return results
}
func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}, results chan *influxql.Result) {
func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing <-chan struct{}, results chan *influxql.Result) {
defer close(results)
e.statMap.Add(statQueriesActive, 1)
@ -78,6 +81,19 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now())
if e.QueryManager != nil {
var err error
_, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
Query: query,
Database: database,
InterruptCh: closing,
})
if err != nil {
results <- &influxql.Result{Err: err}
return
}
}
logger := e.logger()
var i int
@ -167,6 +183,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowQueriesStatement:
rows, err = e.executeShowQueriesStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowServersStatement:
@ -197,7 +215,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
Err: err,
}
// Stop of the first error.
// Stop after the first error.
if err != nil {
break
}
@ -597,6 +615,10 @@ func (e *QueryExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrants
return []*models.Row{row}, nil
}
func (e *QueryExecutor) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) {
return influxql.ExecuteShowQueriesStatement(e.QueryManager, q)
}
func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
di, err := e.MetaClient.Database(q.Database)
if err != nil {

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/copier"
@ -166,6 +167,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.TSDBStore = s.TSDBStore
s.QueryExecutor.Monitor = s.Monitor
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager()
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}

View File

@ -119,6 +119,7 @@ func (*ShowDatabasesStatement) node() {}
func (*ShowFieldKeysStatement) node() {}
func (*ShowRetentionPoliciesStatement) node() {}
func (*ShowMeasurementsStatement) node() {}
func (*ShowQueriesStatement) node() {}
func (*ShowSeriesStatement) node() {}
func (*ShowShardGroupsStatement) node() {}
func (*ShowShardsStatement) node() {}
@ -226,6 +227,7 @@ func (*ShowServersStatement) stmt() {}
func (*ShowDatabasesStatement) stmt() {}
func (*ShowFieldKeysStatement) stmt() {}
func (*ShowMeasurementsStatement) stmt() {}
func (*ShowQueriesStatement) stmt() {}
func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {}
func (*ShowShardGroupsStatement) stmt() {}
@ -2401,6 +2403,19 @@ func (s *DropMeasurementStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}
// SowQueriesStatement represents a command for listing all running queries.
type ShowQueriesStatement struct{}
// String returns a string representation of the show queries statement.
func (s *ShowQueriesStatement) String() string {
return "SHOW QUERIES"
}
// RequiredPrivileges returns the privilege required to execute a ShowQueriesStatement.
func (s *ShowQueriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: ReadPrivilege}}
}
// ShowRetentionPoliciesStatement represents a command for listing retention policies.
type ShowRetentionPoliciesStatement struct {
// Name of the database to list policies for.

View File

@ -131,6 +131,8 @@ func (p *Parser) parseShowStatement() (Statement, error) {
return nil, newParseError(tokstr(tok, lit), []string{"KEYS"}, pos)
case MEASUREMENTS:
return p.parseShowMeasurementsStatement()
case QUERIES:
return p.parseShowQueriesStatement()
case RETENTION:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == POLICIES {
@ -171,6 +173,7 @@ func (p *Parser) parseShowStatement() (Statement, error) {
"FIELD",
"GRANTS",
"MEASUREMENTS",
"QUERIES",
"RETENTION",
"SERIES",
"SERVERS",
@ -1071,6 +1074,12 @@ func (p *Parser) parseShowMeasurementsStatement() (*ShowMeasurementsStatement, e
return stmt, nil
}
// parseShowQueriesStatement parses a string and returns a ShowQueriesStatement.
// This function assumes the "SHOW QUERIES" tokens have been consumed.
func (p *Parser) parseShowQueriesStatement() (*ShowQueriesStatement, error) {
return &ShowQueriesStatement{}, nil
}
// parseShowRetentionPoliciesStatement parses a string and returns a ShowRetentionPoliciesStatement.
// This function assumes the "SHOW RETENTION POLICIES" tokens have been consumed.
func (p *Parser) parseShowRetentionPoliciesStatement() (*ShowRetentionPoliciesStatement, error) {

View File

@ -749,6 +749,12 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// SHOW QUERIES
{
s: `SHOW QUERIES`,
stmt: &influxql.ShowQueriesStatement{},
},
// SHOW RETENTION POLICIES
{
s: `SHOW RETENTION POLICIES ON mydb`,
@ -1742,7 +1748,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SHOW RETENTION POLICIES mydb`, err: `found mydb, expected ON at line 1, char 25`},
{s: `SHOW RETENTION POLICIES ON`, err: `found EOF, expected identifier at line 1, char 28`},
{s: `SHOW SHARD`, err: `found EOF, expected GROUPS at line 1, char 12`},
{s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, DIAGNOSTICS, FIELD, GRANTS, MEASUREMENTS, RETENTION, SERIES, SERVERS, SHARD, SHARDS, STATS, SUBSCRIPTIONS, TAG, USERS at line 1, char 6`},
{s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, DIAGNOSTICS, FIELD, GRANTS, MEASUREMENTS, QUERIES, RETENTION, SERIES, SERVERS, SHARD, SHARDS, STATS, SUBSCRIPTIONS, TAG, USERS at line 1, char 6`},
{s: `SHOW STATS FOR`, err: `found EOF, expected string at line 1, char 16`},
{s: `SHOW DIAGNOSTICS FOR`, err: `found EOF, expected string at line 1, char 22`},
{s: `SHOW GRANTS`, err: `found EOF, expected FOR at line 1, char 13`},

162
influxql/query_manager.go Normal file
View File

@ -0,0 +1,162 @@
package influxql
import (
"errors"
"fmt"
"sync"
"time"
"github.com/influxdata/influxdb/models"
)
var (
ErrNoQueryManager = errors.New("no query manager available")
)
// QueryTaskInfo holds information about a currently running query.
type QueryTaskInfo struct {
ID uint64
Query string
Database string
Duration time.Duration
}
// QueryParams holds the parameters used for creating a new query.
type QueryParams struct {
// The query to be tracked. Required.
Query *Query
// The database this query is being run in. Required.
Database string
// The channel to watch for when this query is interrupted or finished.
// Not required, but highly recommended. If this channel is not set, the
// query needs to be manually managed by the caller.
InterruptCh <-chan struct{}
}
type QueryManager interface {
// AttachQuery attaches a running query to be managed by the query manager.
// Returns the query id of the newly attached query or an error if it was
// unable to assign a query id or attach the query to the query manager.
// This function also returns a channel that will be closed when this
// query finishes running.
//
// After a query finishes running, the system is free to reuse a query id.
AttachQuery(params *QueryParams) (qid uint64, closing <-chan struct{}, err error)
// KillQuery stops and removes a query from the query manager.
// This method can be used to forcefully terminate a running query.
KillQuery(qid uint64) error
// Queries lists the currently running tasks.
Queries() []QueryTaskInfo
}
func DefaultQueryManager() QueryManager {
return &defaultQueryManager{
queries: make(map[uint64]*queryTask),
nextID: 1,
}
}
func ExecuteShowQueriesStatement(qm QueryManager, q *ShowQueriesStatement) (models.Rows, error) {
if qm == nil {
return nil, ErrNoQueryManager
}
queries := qm.Queries()
values := make([][]interface{}, len(queries))
for i, qi := range queries {
var d string
if qi.Duration == 0 {
d = "0s"
} else if qi.Duration < time.Second {
d = fmt.Sprintf("%du", qi.Duration)
} else {
d = (qi.Duration - (qi.Duration % time.Second)).String()
}
values[i] = []interface{}{
qi.ID,
qi.Query,
qi.Database,
d,
}
}
return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration"},
Values: values,
}}, nil
}
type queryTask struct {
query string
database string
startTime time.Time
closing chan struct{}
once sync.Once
}
type defaultQueryManager struct {
queries map[uint64]*queryTask
nextID uint64
mu sync.Mutex
}
func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan struct{}, error) {
qm.mu.Lock()
defer qm.mu.Unlock()
qid := qm.nextID
query := &queryTask{
query: params.Query.String(),
database: params.Database,
startTime: time.Now(),
closing: make(chan struct{}),
}
qm.queries[qid] = query
if params.InterruptCh != nil {
go qm.waitForQuery(qid, params.InterruptCh)
}
qm.nextID++
return qid, query.closing, nil
}
func (qm *defaultQueryManager) waitForQuery(qid uint64, closing <-chan struct{}) {
<-closing
qm.KillQuery(qid)
}
func (qm *defaultQueryManager) KillQuery(qid uint64) error {
qm.mu.Lock()
defer qm.mu.Unlock()
query, ok := qm.queries[qid]
if !ok {
return fmt.Errorf("no such query id: %d", qid)
}
close(query.closing)
delete(qm.queries, qid)
return nil
}
func (qm *defaultQueryManager) Queries() []QueryTaskInfo {
qm.mu.Lock()
defer qm.mu.Unlock()
now := time.Now()
queries := make([]QueryTaskInfo, 0, len(qm.queries))
for qid, task := range qm.queries {
queries = append(queries, QueryTaskInfo{
ID: qid,
Query: task.query,
Database: task.database,
Duration: now.Sub(task.startTime),
})
}
return queries
}

View File

@ -295,6 +295,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
<-notify
close(closing)
}()
} else {
defer close(closing)
}
// Execute query.