Support a timeout for running queries in the query manager

Include an interrupt iterator at the top level to interrupt the fill
iterator if it is producing too many points.

Fixes #6075.
pull/6081/head
Jonathan A. Sternberg 2016-03-21 12:00:07 -04:00
parent 9620130308
commit 79fe4490c2
7 changed files with 71 additions and 8 deletions

View File

@ -8,6 +8,7 @@
- [#5939](https://github.com/influxdata/influxdb/issues/5939): Support viewing and killing running queries.
- [#6073](https://github.com/influxdata/influxdb/pulls/6073): Iterator stats
- [#6079](https://github.com/influxdata/influxdb/issues/6079): Limit the maximum number of concurrent queries.
- [#6075](https://github.com/influxdata/influxdb/issues/6075): Limit the maximum running time of a query.
### Bugfixes

View File

@ -16,6 +16,10 @@ const (
// DefaultShardMapperTimeout is the default timeout set on shard mappers.
DefaultShardMapperTimeout = 5 * time.Second
// DefaultQueryTimeout is the default timeout for executing a query.
// A value of zero will have no query timeout.
DefaultQueryTimeout = time.Duration(0)
// DefaultMaxRemoteWriteConnections is the maximum number of open connections
// that will be available for remote writes to another host.
DefaultMaxRemoteWriteConnections = 3
@ -33,6 +37,7 @@ type Config struct {
MaxRemoteWriteConnections int `toml:"max-remote-write-connections"`
ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
QueryTimeout toml.Duration `toml:"query-timeout"`
}
// NewConfig returns an instance of Config with defaults.
@ -41,6 +46,7 @@ func NewConfig() Config {
WriteTimeout: toml.Duration(DefaultWriteTimeout),
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout),
QueryTimeout: toml.Duration(DefaultQueryTimeout),
MaxRemoteWriteConnections: DefaultMaxRemoteWriteConnections,
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
}

View File

@ -39,6 +39,9 @@ type QueryExecutor struct {
// Used for managing and tracking running queries.
QueryManager influxql.QueryManager
// Query execution timeout.
QueryTimeout time.Duration
// Remote execution timeout
Timeout time.Duration
@ -59,9 +62,10 @@ const (
// NewQueryExecutor returns a new instance of QueryExecutor.
func NewQueryExecutor() *QueryExecutor {
return &QueryExecutor{
Timeout: DefaultShardMapperTimeout,
LogOutput: ioutil.Discard,
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
Timeout: DefaultShardMapperTimeout,
QueryTimeout: DefaultQueryTimeout,
LogOutput: ioutil.Discard,
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
}
}
@ -86,6 +90,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
_, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
Query: query,
Database: database,
Timeout: e.QueryTimeout,
InterruptCh: closing,
})
if err != nil {
@ -469,6 +474,12 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
for {
row := em.Emit()
if row == nil {
// Check if the query was interrupted while emitting.
select {
case <-closing:
return influxql.ErrQueryInterrupted
default:
}
break
}
@ -489,7 +500,7 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
// Send results or exit if closing.
select {
case <-closing:
return nil
return influxql.ErrQueryInterrupted
case results <- result:
}

View File

@ -164,6 +164,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.TSDBStore = s.TSDBStore
s.QueryExecutor.Monitor = s.Monitor
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout)
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager(c.Cluster.MaxConcurrentQueries)
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr

View File

@ -10,8 +10,13 @@ import (
)
var (
// ErrNoQueryManager is an error sent when a SHOW QUERIES or KILL QUERY
// statement is issued with no query manager.
ErrNoQueryManager = errors.New("no query manager available")
// ErrQueryInterrupted is an error returned when the query is interrupted.
ErrQueryInterrupted = errors.New("query interrupted")
// ErrMaxConcurrentQueriesReached is an error when a query cannot be run
// because the maximum number of queries has been reached.
ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached")
@ -33,6 +38,9 @@ type QueryParams struct {
// The database this query is being run in. Required.
Database string
// The timeout for automatically killing a query that hasn't finished. Optional.
Timeout time.Duration
// The channel to watch for when this query is interrupted or finished.
// Not required, but highly recommended. If this channel is not set, the
// query needs to be manually managed by the caller.
@ -127,15 +135,23 @@ func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan
}
qm.queries[qid] = query
if params.InterruptCh != nil {
go qm.waitForQuery(qid, params.InterruptCh)
if params.InterruptCh != nil || params.Timeout != 0 {
go qm.waitForQuery(qid, params.Timeout, params.InterruptCh)
}
qm.nextID++
return qid, query.closing, nil
}
func (qm *defaultQueryManager) waitForQuery(qid uint64, closing <-chan struct{}) {
<-closing
func (qm *defaultQueryManager) waitForQuery(qid uint64, timeout time.Duration, closing <-chan struct{}) {
var timer <-chan time.Time
if timeout != 0 {
timer = time.After(timeout)
}
select {
case <-closing:
case <-timer:
}
qm.KillQuery(qid)
}

View File

@ -127,6 +127,31 @@ func TestQueryManager_Queries(t *testing.T) {
}
}
func TestQueryManager_Limit_Timeout(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
Timeout: time.Nanosecond,
}
_, ch, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
select {
case <-ch:
case <-time.After(time.Millisecond):
t.Errorf("timeout has not killed the query")
}
}
func TestQueryManager_Limit_ConcurrentQueries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {

View File

@ -348,6 +348,9 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter
if !opt.Interval.IsZero() && opt.Fill != NoFill {
itr = NewFillIterator(itr, expr, opt)
}
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}
case *BinaryExpr: