Merge pull request #5950 from influxdata/js-5939-query-manager

Implement a query manager for running queries
pull/6085/head
Jonathan A. Sternberg 2016-03-21 16:02:26 -04:00
commit b12cf04a73
17 changed files with 627 additions and 12 deletions

View File

@ -5,6 +5,7 @@
- [#6012](https://github.com/influxdata/influxdb/pull/6012): Add DROP SHARD support.
- [#6025](https://github.com/influxdata/influxdb/pull/6025): Remove deprecated JSON write path.
- [#5744](https://github.com/influxdata/influxdb/issues/5744): Add integer literal support to the query language.
- [#5939](https://github.com/influxdata/influxdb/issues/5939): Support viewing and killing running queries.
### Bugfixes

View File

@ -36,6 +36,9 @@ type QueryExecutor struct {
// Used for rewriting points back into system for SELECT INTO statements.
PointsWriter *PointsWriter
// Used for managing and tracking running queries.
QueryManager influxql.QueryManager
// Remote execution timeout
Timeout time.Duration
@ -69,7 +72,7 @@ func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
return results
}
func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}, results chan *influxql.Result) {
func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing <-chan struct{}, results chan *influxql.Result) {
defer close(results)
e.statMap.Add(statQueriesActive, 1)
@ -78,6 +81,19 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now())
if e.QueryManager != nil {
var err error
_, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
Query: query,
Database: database,
InterruptCh: closing,
})
if err != nil {
results <- &influxql.Result{Err: err}
return
}
}
logger := e.logger()
var i int
@ -155,6 +171,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
err = e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
err = e.executeGrantAdminStatement(stmt)
case *influxql.KillQueryStatement:
err = e.executeKillQueryStatement(stmt)
case *influxql.RevokeStatement:
err = e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
@ -167,6 +185,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowQueriesStatement:
rows, err = e.executeShowQueriesStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowServersStatement:
@ -197,7 +217,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
Err: err,
}
// Stop of the first error.
// Stop after the first error.
if err != nil {
break
}
@ -357,6 +377,13 @@ func (e *QueryExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStat
return e.MetaClient.SetAdminPrivilege(stmt.User, true)
}
func (e *QueryExecutor) executeKillQueryStatement(stmt *influxql.KillQueryStatement) error {
if e.QueryManager == nil {
return influxql.ErrNoQueryManager
}
return e.QueryManager.KillQuery(stmt.QueryID)
}
func (e *QueryExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
priv := influxql.NoPrivileges
@ -384,7 +411,7 @@ func (e *QueryExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordU
func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, chunkSize, statementID int, results chan *influxql.Result, closing <-chan struct{}) error {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{}
opt := influxql.SelectOptions{InterruptCh: closing}
// Replace instances of "now()" with the current time, and check the resultant times.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
@ -597,6 +624,10 @@ func (e *QueryExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrants
return []*models.Row{row}, nil
}
func (e *QueryExecutor) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) {
return influxql.ExecuteShowQueriesStatement(e.QueryManager, q)
}
func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
di, err := e.MetaClient.Database(q.Database)
if err != nil {

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/copier"
@ -166,6 +167,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.TSDBStore = s.TSDBStore
s.QueryExecutor.Monitor = s.Monitor
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager()
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}

View File

@ -108,6 +108,7 @@ func (*DropSubscriptionStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*GrantAdminStatement) node() {}
func (*KillQueryStatement) node() {}
func (*RevokeStatement) node() {}
func (*RevokeAdminStatement) node() {}
func (*SelectStatement) node() {}
@ -119,6 +120,7 @@ func (*ShowDatabasesStatement) node() {}
func (*ShowFieldKeysStatement) node() {}
func (*ShowRetentionPoliciesStatement) node() {}
func (*ShowMeasurementsStatement) node() {}
func (*ShowQueriesStatement) node() {}
func (*ShowSeriesStatement) node() {}
func (*ShowShardGroupsStatement) node() {}
func (*ShowShardsStatement) node() {}
@ -220,12 +222,14 @@ func (*DropSubscriptionStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*GrantAdminStatement) stmt() {}
func (*KillQueryStatement) stmt() {}
func (*ShowContinuousQueriesStatement) stmt() {}
func (*ShowGrantsForUserStatement) stmt() {}
func (*ShowServersStatement) stmt() {}
func (*ShowDatabasesStatement) stmt() {}
func (*ShowFieldKeysStatement) stmt() {}
func (*ShowMeasurementsStatement) stmt() {}
func (*ShowQueriesStatement) stmt() {}
func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {}
func (*ShowShardGroupsStatement) stmt() {}
@ -646,6 +650,19 @@ func (s *GrantAdminStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}
type KillQueryStatement struct {
// The query to kill.
QueryID uint64
}
func (s *KillQueryStatement) String() string {
return fmt.Sprintf("KILL QUERY %d", s.QueryID)
}
func (s *KillQueryStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}
// SetPasswordUserStatement represents a command for changing user password.
type SetPasswordUserStatement struct {
// Plain Password
@ -2401,6 +2418,19 @@ func (s *DropMeasurementStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}
// SowQueriesStatement represents a command for listing all running queries.
type ShowQueriesStatement struct{}
// String returns a string representation of the show queries statement.
func (s *ShowQueriesStatement) String() string {
return "SHOW QUERIES"
}
// RequiredPrivileges returns the privilege required to execute a ShowQueriesStatement.
func (s *ShowQueriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: ReadPrivilege}}
}
// ShowRetentionPoliciesStatement represents a command for listing retention policies.
type ShowRetentionPoliciesStatement struct {
// Name of the database to list policies for.

View File

@ -579,8 +579,9 @@ func BenchmarkCallIterator_Min_Float(b *testing.B) {
b.ReportAllocs()
itr, err := influxql.NewCallIterator(input, influxql.IteratorOptions{
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
InterruptCh: make(chan struct{}),
})
if err != nil {
b.Fatal(err)

View File

@ -570,6 +570,39 @@ func (itr *floatIntervalIterator) Next() *FloatPoint {
return p
}
// floatInterruptIterator represents a float implementation of InterruptIterator.
type floatInterruptIterator struct {
input FloatIterator
closing <-chan struct{}
count int
}
func newFloatInterruptIterator(input FloatIterator, closing <-chan struct{}) *floatInterruptIterator {
return &floatInterruptIterator{input: input, closing: closing}
}
func (itr *floatInterruptIterator) Close() error { return itr.input.Close() }
func (itr *floatInterruptIterator) Next() *FloatPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
@ -1894,6 +1927,39 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint {
return p
}
// integerInterruptIterator represents a integer implementation of InterruptIterator.
type integerInterruptIterator struct {
input IntegerIterator
closing <-chan struct{}
count int
}
func newIntegerInterruptIterator(input IntegerIterator, closing <-chan struct{}) *integerInterruptIterator {
return &integerInterruptIterator{input: input, closing: closing}
}
func (itr *integerInterruptIterator) Close() error { return itr.input.Close() }
func (itr *integerInterruptIterator) Next() *IntegerPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
@ -3215,6 +3281,39 @@ func (itr *stringIntervalIterator) Next() *StringPoint {
return p
}
// stringInterruptIterator represents a string implementation of InterruptIterator.
type stringInterruptIterator struct {
input StringIterator
closing <-chan struct{}
count int
}
func newStringInterruptIterator(input StringIterator, closing <-chan struct{}) *stringInterruptIterator {
return &stringInterruptIterator{input: input, closing: closing}
}
func (itr *stringInterruptIterator) Close() error { return itr.input.Close() }
func (itr *stringInterruptIterator) Next() *StringPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
@ -4536,6 +4635,39 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint {
return p
}
// booleanInterruptIterator represents a boolean implementation of InterruptIterator.
type booleanInterruptIterator struct {
input BooleanIterator
closing <-chan struct{}
count int
}
func newBooleanInterruptIterator(input BooleanIterator, closing <-chan struct{}) *booleanInterruptIterator {
return &booleanInterruptIterator{input: input, closing: closing}
}
func (itr *booleanInterruptIterator) Close() error { return itr.input.Close() }
func (itr *booleanInterruptIterator) Next() *BooleanPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator

View File

@ -569,6 +569,39 @@ func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point {
return p
}
// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator.
type {{$k.name}}InterruptIterator struct {
input {{$k.Name}}Iterator
closing <-chan struct{}
count int
}
func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator {
return &{{$k.name}}InterruptIterator{input: input, closing: closing}
}
func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}InterruptIterator) Next() *{{$k.Name}}Point {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator

View File

@ -222,6 +222,23 @@ func NewIntervalIterator(input Iterator, opt IteratorOptions) Iterator {
}
}
// NewInterruptIterator returns an iterator that will stop producing output when a channel
// has been closed on the passed in channel.
func NewInterruptIterator(input Iterator, closing <-chan struct{}) Iterator {
switch input := input.(type) {
case FloatIterator:
return newFloatInterruptIterator(input, closing)
case IntegerIterator:
return newIntegerInterruptIterator(input, closing)
case StringIterator:
return newStringInterruptIterator(input, closing)
case BooleanIterator:
return newBooleanInterruptIterator(input, closing)
default:
panic(fmt.Sprintf("unsupported fill iterator type: %T", input))
}
}
// AuxIterator represents an iterator that can split off separate auxilary iterators.
type AuxIterator interface {
Iterator
@ -466,7 +483,11 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)
// Merge into a single iterator.
if opt.MergeSorted() {
return NewSortedMergeIterator(itrs, opt), nil
itr := NewSortedMergeIterator(itrs, opt)
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}
itr := NewMergeIterator(itrs, opt)
@ -478,6 +499,10 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)
}
}
}
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return NewCallIterator(itr, opt)
}
@ -604,6 +629,10 @@ type IteratorOptions struct {
// Removes duplicate rows from raw queries.
Dedupe bool
// If this channel is set and is closed, the iterator should try to exit
// and close as soon as possible.
InterruptCh <-chan struct{}
}
// newIteratorOptionsStmt creates the iterator options from stmt.
@ -655,6 +684,9 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite
opt.Fill, opt.FillValue = stmt.Fill, stmt.FillValue
opt.Limit, opt.Offset = stmt.Limit, stmt.Offset
opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset
if sopt != nil {
opt.InterruptCh = sopt.InterruptCh
}
return opt, nil
}

View File

@ -105,8 +105,10 @@ func (p *Parser) ParseStatement() (Statement, error) {
return p.parseAlterStatement()
case SET:
return p.parseSetPasswordUserStatement()
case KILL:
return p.parseKillQueryStatement()
default:
return nil, newParseError(tokstr(tok, lit), []string{"SELECT", "DELETE", "SHOW", "CREATE", "DROP", "GRANT", "REVOKE", "ALTER", "SET"}, pos)
return nil, newParseError(tokstr(tok, lit), []string{"SELECT", "DELETE", "SHOW", "CREATE", "DROP", "GRANT", "REVOKE", "ALTER", "SET", "KILL"}, pos)
}
}
@ -131,6 +133,8 @@ func (p *Parser) parseShowStatement() (Statement, error) {
return nil, newParseError(tokstr(tok, lit), []string{"KEYS"}, pos)
case MEASUREMENTS:
return p.parseShowMeasurementsStatement()
case QUERIES:
return p.parseShowQueriesStatement()
case RETENTION:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == POLICIES {
@ -171,6 +175,7 @@ func (p *Parser) parseShowStatement() (Statement, error) {
"FIELD",
"GRANTS",
"MEASUREMENTS",
"QUERIES",
"RETENTION",
"SERIES",
"SERVERS",
@ -287,6 +292,20 @@ func (p *Parser) parseSetPasswordUserStatement() (*SetPasswordUserStatement, err
return stmt, nil
}
// parseKillQueryStatement parses a string and returns a kill statement.
// This function assumes the KILL token has already been consumed.
func (p *Parser) parseKillQueryStatement() (*KillQueryStatement, error) {
if err := p.parseTokens([]Token{QUERY}); err != nil {
return nil, err
}
qid, err := p.parseUInt64()
if err != nil {
return nil, err
}
return &KillQueryStatement{QueryID: qid}, nil
}
// parseCreateSubscriptionStatement parses a string and returns a CreatesubScriptionStatement.
// This function assumes the "CREATE SUBSCRIPTION" tokens have already been consumed.
func (p *Parser) parseCreateSubscriptionStatement() (*CreateSubscriptionStatement, error) {
@ -1071,6 +1090,12 @@ func (p *Parser) parseShowMeasurementsStatement() (*ShowMeasurementsStatement, e
return stmt, nil
}
// parseShowQueriesStatement parses a string and returns a ShowQueriesStatement.
// This function assumes the "SHOW QUERIES" tokens have been consumed.
func (p *Parser) parseShowQueriesStatement() (*ShowQueriesStatement, error) {
return &ShowQueriesStatement{}, nil
}
// parseShowRetentionPoliciesStatement parses a string and returns a ShowRetentionPoliciesStatement.
// This function assumes the "SHOW RETENTION POLICIES" tokens have been consumed.
func (p *Parser) parseShowRetentionPoliciesStatement() (*ShowRetentionPoliciesStatement, error) {

View File

@ -749,6 +749,20 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// SHOW QUERIES
{
s: `SHOW QUERIES`,
stmt: &influxql.ShowQueriesStatement{},
},
// KILL QUERY 4
{
s: `KILL QUERY 4`,
stmt: &influxql.KillQueryStatement{
QueryID: 4,
},
},
// SHOW RETENTION POLICIES
{
s: `SHOW RETENTION POLICIES ON mydb`,
@ -1633,10 +1647,10 @@ func TestParser_ParseStatement(t *testing.T) {
},
// Errors
{s: ``, err: `found EOF, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET at line 1, char 1`},
{s: ``, err: `found EOF, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET, KILL at line 1, char 1`},
{s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`},
{s: `SELECT time FROM myseries`, err: `at least 1 non-time field must be queried`},
{s: `blah blah`, err: `found blah, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET at line 1, char 1`},
{s: `blah blah`, err: `found blah, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET, KILL at line 1, char 1`},
{s: `SELECT field1 X`, err: `found X, expected FROM at line 1, char 15`},
{s: `SELECT field1 FROM "series" WHERE X +;`, err: `found ;, expected identifier, string, number, bool at line 1, char 38`},
{s: `SELECT field1 FROM myseries GROUP`, err: `found EOF, expected BY at line 1, char 35`},
@ -1742,7 +1756,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SHOW RETENTION POLICIES mydb`, err: `found mydb, expected ON at line 1, char 25`},
{s: `SHOW RETENTION POLICIES ON`, err: `found EOF, expected identifier at line 1, char 28`},
{s: `SHOW SHARD`, err: `found EOF, expected GROUPS at line 1, char 12`},
{s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, DIAGNOSTICS, FIELD, GRANTS, MEASUREMENTS, RETENTION, SERIES, SERVERS, SHARD, SHARDS, STATS, SUBSCRIPTIONS, TAG, USERS at line 1, char 6`},
{s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, DIAGNOSTICS, FIELD, GRANTS, MEASUREMENTS, QUERIES, RETENTION, SERIES, SERVERS, SHARD, SHARDS, STATS, SUBSCRIPTIONS, TAG, USERS at line 1, char 6`},
{s: `SHOW STATS FOR`, err: `found EOF, expected string at line 1, char 16`},
{s: `SHOW DIAGNOSTICS FOR`, err: `found EOF, expected string at line 1, char 22`},
{s: `SHOW GRANTS`, err: `found EOF, expected FOR at line 1, char 13`},
@ -1827,6 +1841,8 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `GRANT ALL PRIVILEGES ON testdb TO`, err: `found EOF, expected identifier at line 1, char 35`},
{s: `GRANT ALL TO`, err: `found EOF, expected identifier at line 1, char 14`},
{s: `GRANT ALL PRIVILEGES TO`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `KILL`, err: `found EOF, expected QUERY at line 1, char 6`},
{s: `KILL QUERY 10s`, err: `found 10s, expected integer at line 1, char 12`},
{s: `REVOKE`, err: `found EOF, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 8`},
{s: `REVOKE BOGUS`, err: `found BOGUS, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 8`},
{s: `REVOKE READ`, err: `found EOF, expected ON at line 1, char 13`},

162
influxql/query_manager.go Normal file
View File

@ -0,0 +1,162 @@
package influxql
import (
"errors"
"fmt"
"sync"
"time"
"github.com/influxdata/influxdb/models"
)
var (
ErrNoQueryManager = errors.New("no query manager available")
)
// QueryTaskInfo holds information about a currently running query.
type QueryTaskInfo struct {
ID uint64
Query string
Database string
Duration time.Duration
}
// QueryParams holds the parameters used for creating a new query.
type QueryParams struct {
// The query to be tracked. Required.
Query *Query
// The database this query is being run in. Required.
Database string
// The channel to watch for when this query is interrupted or finished.
// Not required, but highly recommended. If this channel is not set, the
// query needs to be manually managed by the caller.
InterruptCh <-chan struct{}
}
type QueryManager interface {
// AttachQuery attaches a running query to be managed by the query manager.
// Returns the query id of the newly attached query or an error if it was
// unable to assign a query id or attach the query to the query manager.
// This function also returns a channel that will be closed when this
// query finishes running.
//
// After a query finishes running, the system is free to reuse a query id.
AttachQuery(params *QueryParams) (qid uint64, closing <-chan struct{}, err error)
// KillQuery stops and removes a query from the query manager.
// This method can be used to forcefully terminate a running query.
KillQuery(qid uint64) error
// Queries lists the currently running tasks.
Queries() []QueryTaskInfo
}
func DefaultQueryManager() QueryManager {
return &defaultQueryManager{
queries: make(map[uint64]*queryTask),
nextID: 1,
}
}
func ExecuteShowQueriesStatement(qm QueryManager, q *ShowQueriesStatement) (models.Rows, error) {
if qm == nil {
return nil, ErrNoQueryManager
}
queries := qm.Queries()
values := make([][]interface{}, len(queries))
for i, qi := range queries {
var d string
if qi.Duration == 0 {
d = "0s"
} else if qi.Duration < time.Second {
d = fmt.Sprintf("%du", qi.Duration)
} else {
d = (qi.Duration - (qi.Duration % time.Second)).String()
}
values[i] = []interface{}{
qi.ID,
qi.Query,
qi.Database,
d,
}
}
return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration"},
Values: values,
}}, nil
}
type queryTask struct {
query string
database string
startTime time.Time
closing chan struct{}
once sync.Once
}
type defaultQueryManager struct {
queries map[uint64]*queryTask
nextID uint64
mu sync.Mutex
}
func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan struct{}, error) {
qm.mu.Lock()
defer qm.mu.Unlock()
qid := qm.nextID
query := &queryTask{
query: params.Query.String(),
database: params.Database,
startTime: time.Now(),
closing: make(chan struct{}),
}
qm.queries[qid] = query
if params.InterruptCh != nil {
go qm.waitForQuery(qid, params.InterruptCh)
}
qm.nextID++
return qid, query.closing, nil
}
func (qm *defaultQueryManager) waitForQuery(qid uint64, closing <-chan struct{}) {
<-closing
qm.KillQuery(qid)
}
func (qm *defaultQueryManager) KillQuery(qid uint64) error {
qm.mu.Lock()
defer qm.mu.Unlock()
query, ok := qm.queries[qid]
if !ok {
return fmt.Errorf("no such query id: %d", qid)
}
close(query.closing)
delete(qm.queries, qid)
return nil
}
func (qm *defaultQueryManager) Queries() []QueryTaskInfo {
qm.mu.Lock()
defer qm.mu.Unlock()
now := time.Now()
queries := make([]QueryTaskInfo, 0, len(qm.queries))
for qid, task := range qm.queries {
queries = append(queries, QueryTaskInfo{
ID: qid,
Query: task.query,
Database: task.database,
Duration: now.Sub(task.startTime),
})
}
return queries
}

View File

@ -0,0 +1,128 @@
package influxql_test
import (
"fmt"
"testing"
"time"
"github.com/influxdata/influxdb/influxql"
)
func TestQueryManager_AttachQuery(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager()
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
qid, _, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
defer qm.KillQuery(qid)
if qid != 1 {
t.Errorf("incorrect query id: exp=1 got=%d", qid)
}
}
func TestQueryManager_KillQuery(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager()
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
qid, ch, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
qm.KillQuery(qid)
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Error("detaching the query did not close the channel after 100 milliseconds")
}
if err := qm.KillQuery(qid); err == nil || err.Error() != fmt.Sprintf("no such query id: %d", qid) {
t.Errorf("incorrect error detaching query, got %s", err)
}
}
func TestQueryManager_Interrupt(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
closing := make(chan struct{})
qm := influxql.DefaultQueryManager()
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
InterruptCh: closing,
}
_, ch, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
close(closing)
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Error("interrupting the query did not close the channel after 100 milliseconds")
}
}
func TestQueryManager_Queries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager()
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
qid, _, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
queries := qm.Queries()
if len(queries) != 1 {
t.Errorf("expected 1 query, got %d", len(queries))
} else {
qi := queries[0]
if qi.ID != qid {
t.Errorf("query id: exp=%d got=%d", qid, qi.ID)
}
if qi.Query != `SELECT count(value) FROM cpu` {
t.Errorf("query id: incorrect query string, got '%s'", qi.Query)
}
if qi.Database != "mydb" {
t.Errorf("query id: incorrect database, got %s", qi.Database)
}
}
qm.KillQuery(qid)
queries = qm.Queries()
if len(queries) != 0 {
t.Errorf("expected 0 queries, got %d", len(queries))
}
}

View File

@ -137,6 +137,7 @@ func TestScanner_Scan(t *testing.T) {
{s: `INTO`, tok: influxql.INTO},
{s: `KEY`, tok: influxql.KEY},
{s: `KEYS`, tok: influxql.KEYS},
{s: `KILL`, tok: influxql.KILL},
{s: `LIMIT`, tok: influxql.LIMIT},
{s: `SHOW`, tok: influxql.SHOW},
{s: `SHARD`, tok: influxql.SHARD},

View File

@ -14,6 +14,10 @@ type SelectOptions struct {
// The upper bound for a select call.
MaxTime time.Time
// An optional channel that, if closed, signals that the select should be
// interrupted.
InterruptCh <-chan struct{}
}
// Select executes stmt against ic and returns a list of iterators to stream from.

View File

@ -98,6 +98,7 @@ const (
INTO
KEY
KEYS
KILL
LIMIT
META
MEASUREMENT
@ -221,6 +222,7 @@ var tokens = [...]string{
INTO: "INTO",
KEY: "KEY",
KEYS: "KEYS",
KILL: "KILL",
LIMIT: "LIMIT",
MEASUREMENT: "MEASUREMENT",
MEASUREMENTS: "MEASUREMENTS",

View File

@ -295,6 +295,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
<-notify
close(closing)
}()
} else {
defer close(closing)
}
// Execute query.
@ -638,6 +640,10 @@ func (w gzipResponseWriter) Flush() {
w.Writer.(*gzip.Writer).Flush()
}
func (w gzipResponseWriter) CloseNotify() <-chan bool {
return w.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
// determines if the client can accept compressed responses, and encodes accordingly
func gzipFilter(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@ -686,14 +686,23 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator
if err != nil {
return nil, err
}
return influxql.NewCallIterator(influxql.NewMergeIterator(inputs, opt), opt)
input := influxql.NewMergeIterator(inputs, opt)
if opt.InterruptCh != nil {
input = influxql.NewInterruptIterator(input, opt.InterruptCh)
}
return influxql.NewCallIterator(input, opt)
}
itrs, err := e.createVarRefIterator(opt)
if err != nil {
return nil, err
}
return influxql.NewSortedMergeIterator(itrs, opt), nil
itr := influxql.NewSortedMergeIterator(itrs, opt)
if opt.InterruptCh != nil {
itr = influxql.NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}
func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {