diff --git a/CHANGELOG.md b/CHANGELOG.md index a063afb994..630e7d637f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - [#6075](https://github.com/influxdata/influxdb/issues/6075): Limit the maximum running time of a query. - [#6102](https://github.com/influxdata/influxdb/issues/6102): Limit series count in selection - [#6077](https://github.com/influxdata/influxdb/issues/6077): Limit point count in selection. +- [#6078](https://github.com/influxdata/influxdb/issues/6078): Limit bucket count in selection. - [#6060](https://github.com/influxdata/influxdb/pull/6060): Add configurable shard duration to retention policies - [#6116](https://github.com/influxdata/influxdb/pull/6116): Allow `httpd` service to be extensible for routes - [#6111](https://github.com/influxdata/influxdb/pull/6111): Add ability to build static assest. Improved handling of TAR and ZIP package outputs. diff --git a/cluster/config.go b/cluster/config.go index 8d6cb9aa11..93fa66c0ef 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -48,6 +48,7 @@ type Config struct { QueryTimeout toml.Duration `toml:"query-timeout"` MaxSelectPointN int `toml:"max-select-point"` MaxSelectSeriesN int `toml:"max-select-series"` + MaxSelectBucketsN int `toml:"max-select-buckets"` } // NewConfig returns an instance of Config with defaults. diff --git a/cluster/query_executor.go b/cluster/query_executor.go index 2476fa4a91..48966ab169 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -43,8 +43,9 @@ type QueryExecutor struct { QueryTimeout time.Duration // Select statement limits - MaxSelectPointN int - MaxSelectSeriesN int + MaxSelectPointN int + MaxSelectSeriesN int + MaxSelectBucketsN int // Remote execution timeout Timeout time.Duration @@ -469,6 +470,25 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c } stmt = tmp + if e.MaxSelectBucketsN > 0 && !stmt.IsRawQuery { + interval, err := stmt.GroupByInterval() + if err != nil { + return err + } + + if interval > 0 { + // Determine the start and end time matched to the interval (may not match the actual times). + min := opt.MinTime.Truncate(interval) + max := opt.MaxTime.Truncate(interval).Add(interval) + + // Determine the number of buckets by finding the time span and dividing by the interval. + buckets := int64(max.Sub(min)) / int64(interval) + if int(buckets) > e.MaxSelectBucketsN { + return fmt.Errorf("max select bucket count exceeded: %d buckets", buckets) + } + } + } + // Create a set of iterators from a selection. itrs, err := influxql.Select(stmt, ic, &opt) if err != nil { diff --git a/cluster/query_executor_test.go b/cluster/query_executor_test.go index f8d72f13a7..38f29b31fe 100644 --- a/cluster/query_executor_test.go +++ b/cluster/query_executor_test.go @@ -120,6 +120,45 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectSeriesN(t *testing.T) { } } +// Ensure query executor can enforce a maximum bucket selection count. +func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) { + e := DefaultQueryExecutor() + e.MaxSelectBucketsN = 3 + + // The meta client should return a single shards on the local node. + e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) { + return []meta.ShardInfo{ + {ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}}, + }, nil + } + + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{ + Points: []influxql.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}}, + }, nil + } + ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { + return map[string]struct{}{"value": struct{}{}}, nil, nil + } + ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) { + return influxql.SeriesList{ + {Name: "cpu", Aux: []influxql.DataType{influxql.Float}}, + }, nil + } + e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator { return &ic } + + // Verify all results from the query. + if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{ + { + StatementID: 0, + Err: errors.New("max select bucket count exceeded: 4 buckets"), + }, + }) { + t.Fatalf("unexpected results: %s", spew.Sdump(a)) + } +} + // QueryExecutor is a test wrapper for cluster.QueryExecutor. type QueryExecutor struct { *cluster.QueryExecutor diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index cce05e0b49..b602020422 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -168,6 +168,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.QueryExecutor.QueryManager = influxql.DefaultQueryManager(c.Cluster.MaxConcurrentQueries) s.QueryExecutor.MaxSelectPointN = c.Cluster.MaxSelectPointN s.QueryExecutor.MaxSelectSeriesN = c.Cluster.MaxSelectSeriesN + s.QueryExecutor.MaxSelectBucketsN = c.Cluster.MaxSelectBucketsN if c.Data.QueryLogEnabled { s.QueryExecutor.LogOutput = os.Stderr } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 3289ae3bf1..0da6cc22b9 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -145,6 +145,11 @@ reporting-disabled = false [cluster] shard-writer-timeout = "5s" # The time within which a remote shard must respond to a write request. write-timeout = "10s" # The time within which a write request must complete on the cluster. + max-concurrent-queries = 0 # The maximum number of concurrent queries that can run. 0 to disable. + query-timeout = "0s" # The time within a query must complete before being killed automatically. 0s to disable. + max-select-point = 0 # The maximum number of points to scan in a query. 0 to disable. + max-select-series = 0 # The maximum number of series to select in a query. 0 to disable. + max-select-buckets = 0 # The maximum number of buckets to select in an aggregate query. 0 to disable. ### ### [retention]