Limit bucket count in selection

Fixes #6078.
pull/6167/head
Jonathan A. Sternberg 2016-03-30 21:00:29 -04:00
parent eb467d8d7f
commit ad7480e64b
6 changed files with 69 additions and 2 deletions

View File

@ -11,6 +11,7 @@
- [#6075](https://github.com/influxdata/influxdb/issues/6075): Limit the maximum running time of a query.
- [#6102](https://github.com/influxdata/influxdb/issues/6102): Limit series count in selection
- [#6077](https://github.com/influxdata/influxdb/issues/6077): Limit point count in selection.
- [#6078](https://github.com/influxdata/influxdb/issues/6078): Limit bucket count in selection.
- [#6060](https://github.com/influxdata/influxdb/pull/6060): Add configurable shard duration to retention policies
- [#6116](https://github.com/influxdata/influxdb/pull/6116): Allow `httpd` service to be extensible for routes
- [#6111](https://github.com/influxdata/influxdb/pull/6111): Add ability to build static assest. Improved handling of TAR and ZIP package outputs.

View File

@ -48,6 +48,7 @@ type Config struct {
QueryTimeout toml.Duration `toml:"query-timeout"`
MaxSelectPointN int `toml:"max-select-point"`
MaxSelectSeriesN int `toml:"max-select-series"`
MaxSelectBucketsN int `toml:"max-select-buckets"`
}
// NewConfig returns an instance of Config with defaults.

View File

@ -43,8 +43,9 @@ type QueryExecutor struct {
QueryTimeout time.Duration
// Select statement limits
MaxSelectPointN int
MaxSelectSeriesN int
MaxSelectPointN int
MaxSelectSeriesN int
MaxSelectBucketsN int
// Remote execution timeout
Timeout time.Duration
@ -469,6 +470,25 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
}
stmt = tmp
if e.MaxSelectBucketsN > 0 && !stmt.IsRawQuery {
interval, err := stmt.GroupByInterval()
if err != nil {
return err
}
if interval > 0 {
// Determine the start and end time matched to the interval (may not match the actual times).
min := opt.MinTime.Truncate(interval)
max := opt.MaxTime.Truncate(interval).Add(interval)
// Determine the number of buckets by finding the time span and dividing by the interval.
buckets := int64(max.Sub(min)) / int64(interval)
if int(buckets) > e.MaxSelectBucketsN {
return fmt.Errorf("max select bucket count exceeded: %d buckets", buckets)
}
}
}
// Create a set of iterators from a selection.
itrs, err := influxql.Select(stmt, ic, &opt)
if err != nil {

View File

@ -120,6 +120,45 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectSeriesN(t *testing.T) {
}
}
// Ensure query executor can enforce a maximum bucket selection count.
func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
e := DefaultQueryExecutor()
e.MaxSelectBucketsN = 3
// The meta client should return a single shards on the local node.
e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return []meta.ShardInfo{
{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}},
}, nil
}
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{
Points: []influxql.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}},
}, nil
}
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
return map[string]struct{}{"value": struct{}{}}, nil, nil
}
ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
return influxql.SeriesList{
{Name: "cpu", Aux: []influxql.DataType{influxql.Float}},
}, nil
}
e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator { return &ic }
// Verify all results from the query.
if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{
{
StatementID: 0,
Err: errors.New("max select bucket count exceeded: 4 buckets"),
},
}) {
t.Fatalf("unexpected results: %s", spew.Sdump(a))
}
}
// QueryExecutor is a test wrapper for cluster.QueryExecutor.
type QueryExecutor struct {
*cluster.QueryExecutor

View File

@ -168,6 +168,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager(c.Cluster.MaxConcurrentQueries)
s.QueryExecutor.MaxSelectPointN = c.Cluster.MaxSelectPointN
s.QueryExecutor.MaxSelectSeriesN = c.Cluster.MaxSelectSeriesN
s.QueryExecutor.MaxSelectBucketsN = c.Cluster.MaxSelectBucketsN
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}

View File

@ -145,6 +145,11 @@ reporting-disabled = false
[cluster]
shard-writer-timeout = "5s" # The time within which a remote shard must respond to a write request.
write-timeout = "10s" # The time within which a write request must complete on the cluster.
max-concurrent-queries = 0 # The maximum number of concurrent queries that can run. 0 to disable.
query-timeout = "0s" # The time within a query must complete before being killed automatically. 0s to disable.
max-select-point = 0 # The maximum number of points to scan in a query. 0 to disable.
max-select-series = 0 # The maximum number of series to select in a query. 0 to disable.
max-select-buckets = 0 # The maximum number of buckets to select in an aggregate query. 0 to disable.
###
### [retention]