diff --git a/CHANGELOG.md b/CHANGELOG.md index ea0935b904..ef88c9fb66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#5939](https://github.com/influxdata/influxdb/issues/5939): Support viewing and killing running queries. - [#6073](https://github.com/influxdata/influxdb/pulls/6073): Iterator stats - [#6079](https://github.com/influxdata/influxdb/issues/6079): Limit the maximum number of concurrent queries. +- [#6075](https://github.com/influxdata/influxdb/issues/6075): Limit the maximum running time of a query. ### Bugfixes diff --git a/cluster/config.go b/cluster/config.go index 60e8c53814..9bd80b4441 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -16,6 +16,10 @@ const ( // DefaultShardMapperTimeout is the default timeout set on shard mappers. DefaultShardMapperTimeout = 5 * time.Second + // DefaultQueryTimeout is the default timeout for executing a query. + // A value of zero will have no query timeout. + DefaultQueryTimeout = time.Duration(0) + // DefaultMaxRemoteWriteConnections is the maximum number of open connections // that will be available for remote writes to another host. DefaultMaxRemoteWriteConnections = 3 @@ -33,6 +37,7 @@ type Config struct { MaxRemoteWriteConnections int `toml:"max-remote-write-connections"` ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"` MaxConcurrentQueries int `toml:"max-concurrent-queries"` + QueryTimeout toml.Duration `toml:"query-timeout"` } // NewConfig returns an instance of Config with defaults. @@ -41,6 +46,7 @@ func NewConfig() Config { WriteTimeout: toml.Duration(DefaultWriteTimeout), ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout), ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout), + QueryTimeout: toml.Duration(DefaultQueryTimeout), MaxRemoteWriteConnections: DefaultMaxRemoteWriteConnections, MaxConcurrentQueries: DefaultMaxConcurrentQueries, } diff --git a/cluster/query_executor.go b/cluster/query_executor.go index 7739bb586e..5f0d4aefcc 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -39,6 +39,9 @@ type QueryExecutor struct { // Used for managing and tracking running queries. QueryManager influxql.QueryManager + // Query execution timeout. + QueryTimeout time.Duration + // Remote execution timeout Timeout time.Duration @@ -59,9 +62,10 @@ const ( // NewQueryExecutor returns a new instance of QueryExecutor. func NewQueryExecutor() *QueryExecutor { return &QueryExecutor{ - Timeout: DefaultShardMapperTimeout, - LogOutput: ioutil.Discard, - statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil), + Timeout: DefaultShardMapperTimeout, + QueryTimeout: DefaultQueryTimeout, + LogOutput: ioutil.Discard, + statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil), } } @@ -86,6 +90,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu _, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{ Query: query, Database: database, + Timeout: e.QueryTimeout, InterruptCh: closing, }) if err != nil { @@ -469,6 +474,12 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c for { row := em.Emit() if row == nil { + // Check if the query was interrupted while emitting. + select { + case <-closing: + return influxql.ErrQueryInterrupted + default: + } break } @@ -489,7 +500,7 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c // Send results or exit if closing. select { case <-closing: - return nil + return influxql.ErrQueryInterrupted case results <- result: } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 9d5c13954d..5fce77db3b 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -164,6 +164,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.QueryExecutor.TSDBStore = s.TSDBStore s.QueryExecutor.Monitor = s.Monitor s.QueryExecutor.PointsWriter = s.PointsWriter + s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout) s.QueryExecutor.QueryManager = influxql.DefaultQueryManager(c.Cluster.MaxConcurrentQueries) if c.Data.QueryLogEnabled { s.QueryExecutor.LogOutput = os.Stderr diff --git a/influxql/query_manager.go b/influxql/query_manager.go index b4ee118be1..5c7a99fc87 100644 --- a/influxql/query_manager.go +++ b/influxql/query_manager.go @@ -10,8 +10,13 @@ import ( ) var ( + // ErrNoQueryManager is an error sent when a SHOW QUERIES or KILL QUERY + // statement is issued with no query manager. ErrNoQueryManager = errors.New("no query manager available") + // ErrQueryInterrupted is an error returned when the query is interrupted. + ErrQueryInterrupted = errors.New("query interrupted") + // ErrMaxConcurrentQueriesReached is an error when a query cannot be run // because the maximum number of queries has been reached. ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached") @@ -33,6 +38,9 @@ type QueryParams struct { // The database this query is being run in. Required. Database string + // The timeout for automatically killing a query that hasn't finished. Optional. + Timeout time.Duration + // The channel to watch for when this query is interrupted or finished. // Not required, but highly recommended. If this channel is not set, the // query needs to be manually managed by the caller. @@ -127,15 +135,23 @@ func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan } qm.queries[qid] = query - if params.InterruptCh != nil { - go qm.waitForQuery(qid, params.InterruptCh) + if params.InterruptCh != nil || params.Timeout != 0 { + go qm.waitForQuery(qid, params.Timeout, params.InterruptCh) } qm.nextID++ return qid, query.closing, nil } -func (qm *defaultQueryManager) waitForQuery(qid uint64, closing <-chan struct{}) { - <-closing +func (qm *defaultQueryManager) waitForQuery(qid uint64, timeout time.Duration, closing <-chan struct{}) { + var timer <-chan time.Time + if timeout != 0 { + timer = time.After(timeout) + } + + select { + case <-closing: + case <-timer: + } qm.KillQuery(qid) } diff --git a/influxql/query_manager_test.go b/influxql/query_manager_test.go index b4bd6902ab..7a9abaf5da 100644 --- a/influxql/query_manager_test.go +++ b/influxql/query_manager_test.go @@ -127,6 +127,31 @@ func TestQueryManager_Queries(t *testing.T) { } } +func TestQueryManager_Limit_Timeout(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + if err != nil { + t.Fatal(err) + } + + qm := influxql.DefaultQueryManager(0) + params := influxql.QueryParams{ + Query: q, + Database: `mydb`, + Timeout: time.Nanosecond, + } + + _, ch, err := qm.AttachQuery(¶ms) + if err != nil { + t.Fatal(err) + } + + select { + case <-ch: + case <-time.After(time.Millisecond): + t.Errorf("timeout has not killed the query") + } +} + func TestQueryManager_Limit_ConcurrentQueries(t *testing.T) { q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { diff --git a/influxql/select.go b/influxql/select.go index f1415c13e1..59932e84d0 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -348,6 +348,9 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter if !opt.Interval.IsZero() && opt.Fill != NoFill { itr = NewFillIterator(itr, expr, opt) } + if opt.InterruptCh != nil { + itr = NewInterruptIterator(itr, opt.InterruptCh) + } return itr, nil } case *BinaryExpr: