Implicitly decide on the lower limit for fill queries when none is present

This allows the query:

    SELECT mean(value) FROM cpu GROUP BY time(1d)

To function in some way that makes sense. The upper limit is implicitly
the `now()` starting time and the lower limit will be whichever interval
the lowest point falls into.

When no lower bound is specified and `max-select-buckets` is specified,
the query will only consider points that would satisfy
`max-select-buckets`. So if you have one point written in 1970, have
another point within the last minute, and then do the above query with
`max-select-buckets` being equal to 10, the older point from 1970 will
not be considered.
pull/8866/head
Jonathan A. Sternberg 2017-09-22 16:07:34 -05:00
parent 31d50ec15e
commit f20cab6e99
8 changed files with 200 additions and 27 deletions

View File

@ -37,6 +37,7 @@
- [#8857](https://github.com/influxdata/influxdb/pull/8857): Improve performance of Bloom Filter in TSI index.
- [#8897](https://github.com/influxdata/influxdb/pull/8897): Add message pack format for query responses.
- [#8886](https://github.com/influxdata/influxdb/pull/8886): Improved compaction scheduling
- [#8690](https://github.com/influxdata/influxdb/issues/8690): Implicitly decide on a lower limit for fill queries when none is present.
### Bugfixes

View File

@ -39,6 +39,7 @@ func (e *LocalShardMapper) MapShards(sources influxql.Sources, t influxql.TimeRa
if err := e.mapShards(a, sources, tmin, tmax); err != nil {
return nil, err
}
a.MinTime, a.MaxTime = tmin, tmax
return a, nil
}
@ -85,6 +86,16 @@ func (e *LocalShardMapper) mapShards(a *LocalShardMapping, sources influxql.Sour
// ShardMapper maps data sources to a list of shard information.
type LocalShardMapping struct {
ShardMap map[Source]tsdb.ShardGroup
// MinTime is the minimum time that this shard mapper will allow.
// Any attempt to use a time before this one will automatically result in using
// this time instead.
MinTime time.Time
// MaxTime is the maximum time that this shard mapper will allow.
// Any attempt to use a time after this one will automatically result in using
// this time instead.
MaxTime time.Time
}
func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
@ -160,6 +171,14 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
return nil, nil
}
// Override the time constraints if they don't match each other.
if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() {
opt.StartTime = a.MinTime.UnixNano()
}
if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() {
opt.EndTime = a.MaxTime.UnixNano()
}
if m.Regex != nil {
measurements := sg.MeasurementsByRegex(m.Regex.Val)
inputs := make([]query.Iterator, 0, len(measurements))
@ -192,6 +211,14 @@ func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.Iter
return query.IteratorCost{}, nil
}
// Override the time constraints if they don't match each other.
if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() {
opt.StartTime = a.MinTime.UnixNano()
}
if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() {
opt.EndTime = a.MaxTime.UnixNano()
}
if m.Regex != nil {
var costs query.IteratorCost
measurements := sg.MeasurementsByRegex(m.Regex.Val)

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
)
// CompileOptions are the customization options for the compiler.
@ -162,9 +163,6 @@ func (c *compiledStatement) compile(stmt *influxql.SelectStatement) error {
if err := c.validateFields(); err != nil {
return err
}
if err := c.validateDimensions(); err != nil {
return err
}
// Look through the sources and compile each of the subqueries (if they exist).
// We do this after compiling the outside because subqueries may require
@ -749,17 +747,6 @@ func (c *compiledStatement) validateFields() error {
return nil
}
// validateDimensions validates that the dimensions are appropriate for this type of query.
func (c *compiledStatement) validateDimensions() error {
if !c.Interval.IsZero() && !c.InheritedInterval {
// There must be a lower limit that wasn't implicitly set.
if c.TimeRange.Min.UnixNano() == influxql.MinTime {
return errors.New("aggregate functions with GROUP BY time require a WHERE time clause with a lower limit")
}
}
return nil
}
// subquery compiles and validates a compiled statement for the subquery using
// this compiledStatement as the parent.
func (c *compiledStatement) subquery(stmt *influxql.SelectStatement) error {
@ -801,8 +788,44 @@ func (c *compiledStatement) subquery(stmt *influxql.SelectStatement) error {
}
func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) (PreparedStatement, error) {
// If this is a query with a grouping, there is a bucket limit, and the minimum time has not been specified,
// we need to limit the possible time range that can be used when mapping shards but not when actually executing
// the select statement. Determine the shard time range here.
timeRange := c.TimeRange
if sopt.MaxBucketsN > 0 && !c.stmt.IsRawQuery && timeRange.MinTime() == influxql.MinTime {
interval, err := c.stmt.GroupByInterval()
if err != nil {
return nil, err
}
offset, err := c.stmt.GroupByOffset()
if err != nil {
return nil, err
}
if interval > 0 {
// Determine the last bucket using the end time.
opt := IteratorOptions{
Interval: Interval{
Duration: interval,
Offset: offset,
},
}
last, _ := opt.Window(c.TimeRange.MaxTime() - 1)
// Determine the time difference using the number of buckets.
// Determine the maximum difference between the buckets based on the end time.
maxDiff := last - models.MinNanoTime
if maxDiff/int64(interval) > int64(sopt.MaxBucketsN) {
timeRange.Min = time.Unix(0, models.MinNanoTime)
} else {
timeRange.Min = time.Unix(0, last-int64(interval)*int64(sopt.MaxBucketsN-1))
}
}
}
// Create an iterator creator based on the shards in the cluster.
shards, err := shardMapper.MapShards(c.stmt.Sources, c.TimeRange, sopt)
shards, err := shardMapper.MapShards(c.stmt.Sources, timeRange, sopt)
if err != nil {
return nil, err
}
@ -823,7 +846,7 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions)
opt.StartTime, opt.EndTime = c.TimeRange.MinTime(), c.TimeRange.MaxTime()
opt.Ascending = c.Ascending
if sopt.MaxBucketsN > 0 && !stmt.IsRawQuery {
if sopt.MaxBucketsN > 0 && !stmt.IsRawQuery && c.TimeRange.MinTime() > influxql.MinTime {
interval, err := stmt.GroupByInterval()
if err != nil {
shards.Close()

View File

@ -198,10 +198,6 @@ func TestCompile_Failures(t *testing.T) {
{s: `SELECT field1 FROM foo fill(none)`, err: `fill(none) must be used with a function`},
{s: `SELECT field1 FROM foo fill(linear)`, err: `fill(linear) must be used with a function`},
{s: `SELECT count(value), value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `SELECT count(value) FROM foo group by time(1s)`, err: `aggregate functions with GROUP BY time require a WHERE time clause with a lower limit`},
{s: `SELECT count(value) FROM foo group by time(500ms)`, err: `aggregate functions with GROUP BY time require a WHERE time clause with a lower limit`},
{s: `SELECT count(value) FROM foo group by time(1s) where host = 'hosta.influxdb.org'`, err: `aggregate functions with GROUP BY time require a WHERE time clause with a lower limit`},
{s: `SELECT count(value) FROM foo group by time(1s) where time < now()`, err: `aggregate functions with GROUP BY time require a WHERE time clause with a lower limit`},
{s: `SELECT count(value) FROM foo group by time`, err: `time() is a function and expects at least one argument`},
{s: `SELECT count(value) FROM foo group by 'time'`, err: `only time and tag dimensions allowed`},
{s: `SELECT count(value) FROM foo where time > now() and time < now() group by time()`, err: `time dimension expected 1 or 2 arguments`},
@ -314,7 +310,6 @@ func TestCompile_Failures(t *testing.T) {
{s: `SELECT count(foo + sum(bar)) FROM cpu`, err: `expected field argument in count()`},
{s: `SELECT (count(foo + sum(bar))) FROM cpu`, err: `expected field argument in count()`},
{s: `SELECT sum(value) + count(foo + sum(bar)) FROM cpu`, err: `expected field argument in count()`},
{s: `SELECT sum(mean) FROM (SELECT mean(value) FROM cpu GROUP BY time(1h))`, err: `aggregate functions with GROUP BY time require a WHERE time clause with a lower limit`},
{s: `SELECT top(value, 2), max(value) FROM cpu`, err: `selector function top() cannot be combined with other functions`},
{s: `SELECT bottom(value, 2), max(value) FROM cpu`, err: `selector function bottom() cannot be combined with other functions`},
{s: `SELECT min(derivative) FROM (SELECT derivative(mean(value), 1h) FROM myseries) where time < now() and time > now() - 1d`, err: `derivative aggregate requires a GROUP BY interval`},

View File

@ -664,6 +664,9 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) {
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -686,7 +689,7 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) {
break
}
} else {
if itr.window.time >= itr.endTime {
if itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
itr.input.unread(p)
p = nil
break
@ -702,6 +705,9 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) {
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -4054,6 +4060,9 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) {
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -4076,7 +4085,7 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) {
break
}
} else {
if itr.window.time >= itr.endTime {
if itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
itr.input.unread(p)
p = nil
break
@ -4092,6 +4101,9 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) {
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -7441,6 +7453,9 @@ func (itr *unsignedFillIterator) Next() (*UnsignedPoint, error) {
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -7463,7 +7478,7 @@ func (itr *unsignedFillIterator) Next() (*UnsignedPoint, error) {
break
}
} else {
if itr.window.time >= itr.endTime {
if itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
itr.input.unread(p)
p = nil
break
@ -7479,6 +7494,9 @@ func (itr *unsignedFillIterator) Next() (*UnsignedPoint, error) {
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -10828,6 +10846,9 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) {
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -10850,7 +10871,7 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) {
break
}
} else {
if itr.window.time >= itr.endTime {
if itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
itr.input.unread(p)
p = nil
break
@ -10866,6 +10887,9 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) {
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -14201,6 +14225,9 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) {
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -14223,7 +14250,7 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) {
break
}
} else {
if itr.window.time >= itr.endTime {
if itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
itr.input.unread(p)
p = nil
break
@ -14239,6 +14266,9 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) {
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}

View File

@ -662,6 +662,9 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
@ -684,7 +687,7 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
break
}
} else {
if itr.window.time >= itr.endTime {
if itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
itr.input.unread(p)
p = nil
break
@ -700,6 +703,9 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}

View File

@ -834,6 +834,35 @@ func TestLimitIterator(t *testing.T) {
}
}
func TestFillIterator_ImplicitStartTime(t *testing.T) {
opt := query.IteratorOptions{
StartTime: influxql.MinTime,
EndTime: mustParseTime("2000-01-01T01:00:00Z").UnixNano() - 1,
Interval: query.Interval{
Duration: 20 * time.Minute,
},
Ascending: true,
}
start := mustParseTime("2000-01-01T00:00:00Z").UnixNano()
itr := query.NewFillIterator(
&FloatIterator{Points: []query.FloatPoint{
{Time: start, Value: 0},
}},
nil,
opt,
)
if a, err := (Iterators{itr}).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]query.Point{
{&query.FloatPoint{Time: start, Value: 0}},
{&query.FloatPoint{Time: start + int64(20*time.Minute), Nil: true}},
{&query.FloatPoint{Time: start + int64(40*time.Minute), Nil: true}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
func TestFillIterator_DST(t *testing.T) {
for _, tt := range []struct {
name string

View File

@ -6640,6 +6640,68 @@ func TestServer_Query_Fill(t *testing.T) {
exp: `{"results":[{"statement_id":0,"series":[{"name":"fills","columns":["time","count"],"values":[["2009-11-10T23:00:00Z",2],["2009-11-10T23:00:05Z",1],["2009-11-10T23:00:10Z",1],["2009-11-10T23:00:15Z",1]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "fill with implicit start time",
command: `select mean(val) from fills where time < '2009-11-10T23:00:20Z' group by time(5s)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",null],["2009-11-10T23:00:15Z",10]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)
for i, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Skipf("SKIP:: %s", query.name)
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
})
}
}
func TestServer_Query_ImplicitFill(t *testing.T) {
t.Parallel()
config := NewConfig()
config.Coordinator.MaxSelectBucketsN = 5
s := OpenServer(config)
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf(`fills val=1 %d`, mustParseTime(time.RFC3339Nano, "2010-01-01T11:30:00Z").UnixNano()),
fmt.Sprintf(`fills val=3 %d`, mustParseTime(time.RFC3339Nano, "2010-01-01T12:00:00Z").UnixNano()),
fmt.Sprintf(`fills val=5 %d`, mustParseTime(time.RFC3339Nano, "2010-01-01T16:30:00Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
test.addQueries([]*Query{
&Query{
name: "fill with implicit start",
command: `select mean(val) from fills where time < '2010-01-01T18:00:00Z' group by time(1h)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"fills","columns":["time","mean"],"values":[["2010-01-01T16:00:00Z",5],["2010-01-01T17:00:00Z",null]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "fill with implicit start - max select buckets",
command: `select mean(val) from fills where time < '2010-01-01T17:00:00Z' group by time(1h)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"fills","columns":["time","mean"],"values":[["2010-01-01T12:00:00Z",3],["2010-01-01T13:00:00Z",null],["2010-01-01T14:00:00Z",null],["2010-01-01T15:00:00Z",null],["2010-01-01T16:00:00Z",5]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)
for i, query := range test.queries {