limit series count in selection

This commit adds a configurable limit to the number of series that
can be returned from a `SELECT` statement. The limit is checked
immediately after planning and is determined by the use of iterator
stats.

Fixes #6076
pull/6102/head
Ben Johnson 2016-03-23 09:05:38 -06:00
parent 45f1c28adb
commit a6d9930b6f
5 changed files with 63 additions and 1 deletions

View File

@ -9,6 +9,7 @@
- [#6073](https://github.com/influxdata/influxdb/pulls/6073): Iterator stats
- [#6079](https://github.com/influxdata/influxdb/issues/6079): Limit the maximum number of concurrent queries.
- [#6075](https://github.com/influxdata/influxdb/issues/6075): Limit the maximum running time of a query.
- [#6102](https://github.com/influxdata/influxdb/issues/6102): Limit series count in selection
### Bugfixes

View File

@ -27,6 +27,10 @@ const (
// DefaultMaxConcurrentQueries is the maximum number of running queries.
// A value of zero will make the maximum query limit unlimited.
DefaultMaxConcurrentQueries = 0
// DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run.
// A value of zero will make the maximum series count unlimited.
DefaultMaxSelectSeriesN = 0
)
// Config represents the configuration for the clustering service.
@ -38,6 +42,7 @@ type Config struct {
ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
QueryTimeout toml.Duration `toml:"query-timeout"`
MaxSelectSeriesN int `toml:"max-select-series"`
}
// NewConfig returns an instance of Config with defaults.
@ -49,5 +54,6 @@ func NewConfig() Config {
QueryTimeout: toml.Duration(DefaultQueryTimeout),
MaxRemoteWriteConnections: DefaultMaxRemoteWriteConnections,
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
MaxSelectSeriesN: DefaultMaxSelectSeriesN,
}
}

View File

@ -42,6 +42,9 @@ type QueryExecutor struct {
// Query execution timeout.
QueryTimeout time.Duration
// Select statement limits
MaxSelectSeriesN int
// Remote execution timeout
Timeout time.Duration
@ -468,6 +471,12 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
em.OmitTime = stmt.OmitTime
defer em.Close()
// Calculate initial stats across all iterators.
stats := influxql.Iterators(itrs).Stats()
if e.MaxSelectSeriesN > 0 && stats.SeriesN > e.MaxSelectSeriesN {
return fmt.Errorf("max select series count exceeded: %d series", stats.SeriesN)
}
// Emit rows to the results channel.
var writeN int64
var emitted bool

View File

@ -2,6 +2,7 @@ package cluster_test
import (
"bytes"
"errors"
"io"
"os"
"reflect"
@ -76,6 +77,49 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
}
}
// Ensure query executor can enforce a maximum series selection count.
func TestQueryExecutor_ExecuteQuery_MaxSelectSeriesN(t *testing.T) {
e := DefaultQueryExecutor()
e.MaxSelectSeriesN = 3
// The meta client should return a two shards on the local node.
e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return []meta.ShardInfo{
{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}},
{ID: 101, Owners: []meta.ShardOwner{{NodeID: 0}}},
}, nil
}
// This iterator creator returns an iterator that operates on 2 series.
// Reuse this iterator for both shards. This brings the total series count to 4.
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{
Points: []influxql.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}},
stats: influxql.IteratorStats{SeriesN: 2},
}, nil
}
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
return map[string]struct{}{"value": struct{}{}}, nil, nil
}
ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
return influxql.SeriesList{
{Name: "cpu", Aux: []influxql.DataType{influxql.Float}},
}, nil
}
e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator { return &ic }
// Verify all results from the query.
if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{
{
StatementID: 0,
Err: errors.New("max select series count exceeded: 4 series"),
},
}) {
t.Fatalf("unexpected results: %s", spew.Sdump(a))
}
}
// QueryExecutor is a test wrapper for cluster.QueryExecutor.
type QueryExecutor struct {
*cluster.QueryExecutor
@ -233,9 +277,10 @@ func (ic *IteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sou
// FloatIterator is a represents an iterator that reads from a slice.
type FloatIterator struct {
Points []influxql.FloatPoint
stats influxql.IteratorStats
}
func (itr *FloatIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
func (itr *FloatIterator) Stats() influxql.IteratorStats { return itr.stats }
func (itr *FloatIterator) Close() error { return nil }
// Next returns the next value and shifts it off the beginning of the points slice.

View File

@ -166,6 +166,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout)
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager(c.Cluster.MaxConcurrentQueries)
s.QueryExecutor.MaxSelectSeriesN = c.Cluster.MaxSelectSeriesN
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}