From 10029caf2f83c077d4c3e8b3cffa7005a21dbded Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 25 Aug 2016 12:52:39 -0500 Subject: [PATCH] Support negative timestamps in the query engine Negative timestamps are now supported. We also now refuse two nanoseconds that are at the edge of the minimum time window. One of the nanoseconds we do not accept is because we need MinInt64 to be used for some internal comparisons in the TSM engine and it was causing an underflow when we subtracted one from the minimum time. The second is so we can have one minimum time that signifies the default minimum that nobody can write to (so we can implicitly rewrite the timestamp on aggregate queries) but still use the explicit timestamp if it is given to us by the user. We aren't able to tell the difference between if the user provided it or if it was implicit without those values being different. If the default minimum time is used with an aggregate query, we rewrite the time to be the epoch for backwards compatibility since we believe that's more important than supporting that extra nanosecond. --- CHANGELOG.md | 1 + coordinator/statement_executor.go | 2 +- influxql/ast.go | 8 +++++--- influxql/iterator.gen.go | 20 ++++++++++++++++++++ influxql/iterator.gen.go.tmpl | 5 +++++ influxql/iterator.go | 13 +++++++++++-- influxql/point.go | 6 ++++-- models/points_test.go | 2 +- models/time.go | 14 ++++++++++++-- tsdb/cursor.go | 4 +++- tsdb/engine/tsm1/file_store.go | 2 +- tsdb/engine/tsm1/reader.go | 2 +- 12 files changed, 65 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf44327eb6..82bdf1f234 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#7605](https://github.com/influxdata/influxdb/issues/7605): Remove IF EXISTS/IF NOT EXISTS from influxql language. - [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting. - [#7199](https://github.com/influxdata/influxdb/pull/7199): Add mode function. Thanks @agaurav. +- [#7194](https://github.com/influxdata/influxdb/issues/7194): Support negative timestamps for the query engine. ### Bugfixes diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 49f4bcbcd9..e54a8b9bb9 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -528,7 +528,7 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx } } if opt.MinTime.IsZero() { - opt.MinTime = time.Unix(0, 0) + opt.MinTime = time.Unix(0, influxql.MinTime).UTC() } // Convert DISTINCT into a call. diff --git a/influxql/ast.go b/influxql/ast.go index a0b0fb6a0e..97c8bfb3c8 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -3630,7 +3630,7 @@ func TimeRange(expr Expr) (min, max time.Time, err error) { } // TimeRangeAsEpochNano returns the minimum and maximum times, as epoch nano, specified by -// and expression. If there is no lower bound, the start of the epoch is returned +// an expression. If there is no lower bound, the minimum time is returned // for minimum. If there is no higher bound, now is returned for maximum. func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) { tmin, tmax, err := TimeRange(expr) @@ -3639,7 +3639,7 @@ func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) { } if tmin.IsZero() { - min = time.Unix(0, 0).UnixNano() + min = time.Unix(0, MinTime).UnixNano() } else { min = tmin.UnixNano() } @@ -3680,7 +3680,9 @@ func timeExprValue(ref Expr, lit Expr) (t time.Time, err error) { case *TimeLiteral: if lit.Val.After(time.Unix(0, MaxTime)) { return time.Time{}, fmt.Errorf("time %s overflows time literal", lit.Val.Format(time.RFC3339)) - } else if lit.Val.Before(time.Unix(0, MinTime)) { + } else if lit.Val.Before(time.Unix(0, MinTime+1)) { + // The minimum allowable time literal is one greater than the minimum time because the minimum time + // is a sentinel value only used internally. return time.Time{}, fmt.Errorf("time %s underflows time literal", lit.Val.Format(time.RFC3339)) } return lit.Val, nil diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index cad54c1d26..45a5ab0e4d 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -816,6 +816,11 @@ func (itr *floatIntervalIterator) Next() (*FloatPoint, error) { return nil, err } p.Time, _ = itr.opt.Window(p.Time) + // If we see the minimum allowable time, set the time to zero so we don't + // break the default returned time for aggregate queries without times. + if p.Time == MinTime { + p.Time = 0 + } return p, nil } @@ -2977,6 +2982,11 @@ func (itr *integerIntervalIterator) Next() (*IntegerPoint, error) { return nil, err } p.Time, _ = itr.opt.Window(p.Time) + // If we see the minimum allowable time, set the time to zero so we don't + // break the default returned time for aggregate queries without times. + if p.Time == MinTime { + p.Time = 0 + } return p, nil } @@ -5135,6 +5145,11 @@ func (itr *stringIntervalIterator) Next() (*StringPoint, error) { return nil, err } p.Time, _ = itr.opt.Window(p.Time) + // If we see the minimum allowable time, set the time to zero so we don't + // break the default returned time for aggregate queries without times. + if p.Time == MinTime { + p.Time = 0 + } return p, nil } @@ -7293,6 +7308,11 @@ func (itr *booleanIntervalIterator) Next() (*BooleanPoint, error) { return nil, err } p.Time, _ = itr.opt.Window(p.Time) + // If we see the minimum allowable time, set the time to zero so we don't + // break the default returned time for aggregate queries without times. + if p.Time == MinTime { + p.Time = 0 + } return p, nil } diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 2d10c82367..d6afe6c465 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -815,6 +815,11 @@ func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) { return nil, err } p.Time, _ = itr.opt.Window(p.Time) + // If we see the minimum allowable time, set the time to zero so we don't + // break the default returned time for aggregate queries without times. + if p.Time == MinTime { + p.Time = 0 + } return p, nil } diff --git a/influxql/iterator.go b/influxql/iterator.go index 076f79cfa0..cad5440ecd 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -19,7 +19,10 @@ var ErrUnknownCall = errors.New("unknown call") const ( // MinTime is used as the minimum time value when computing an unbounded range. - MinTime = int64(0) + // This time is one less than the MinNanoTime so that the first minimum + // time can be used as a sentinel value to signify that it is the default + // value rather than explicitly set by the user. + MinTime = models.MinNanoTime - 1 // MaxTime is used as the maximum time value when computing an unbounded range. // This time is 2262-04-11 23:47:16.854775806 +0000 UTC @@ -925,7 +928,13 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) { t -= int64(opt.Interval.Offset) // Truncate time by duration. - t -= t % int64(opt.Interval.Duration) + dt := t % int64(opt.Interval.Duration) + if dt < 0 { + // Negative modulo rounds up instead of down, so offset + // with the duration. + dt += int64(opt.Interval.Duration) + } + t -= dt // Apply the offset. start = t + int64(opt.Interval.Offset) diff --git a/influxql/point.go b/influxql/point.go index b80e328043..762f0d7ba8 100644 --- a/influxql/point.go +++ b/influxql/point.go @@ -5,14 +5,16 @@ import ( "encoding/binary" "fmt" "io" + "math" "sort" "github.com/gogo/protobuf/proto" internal "github.com/influxdata/influxdb/influxql/internal" ) -// ZeroTime is the Unix nanosecond timestamp for time.Time{}. -const ZeroTime = int64(-6795364578871345152) +// ZeroTime is the Unix nanosecond timestamp for no time. +// This time is not used by the query engine or the storage engine as a valid time. +const ZeroTime = int64(math.MinInt64) // Point represents a value in a series that occurred at a given time. type Point interface { diff --git a/models/points_test.go b/models/points_test.go index e7df0bfc09..27caac335d 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -1225,7 +1225,7 @@ func TestParsePointMaxTimestamp(t *testing.T) { } func TestParsePointMinTimestamp(t *testing.T) { - test(t, `cpu value=1 -9223372036854775808`, + test(t, `cpu value=1 -9223372036854775806`, NewTestPoint( "cpu", models.Tags{}, diff --git a/models/time.go b/models/time.go index c6f723d97b..e98f2cb336 100644 --- a/models/time.go +++ b/models/time.go @@ -12,14 +12,24 @@ import ( const ( // MinNanoTime is the minumum time that can be represented. // - // 1677-09-21 00:12:43.145224192 +0000 UTC + // 1677-09-21 00:12:43.145224194 +0000 UTC // - MinNanoTime = int64(math.MinInt64) + // The two lowest minimum integers are used as sentinel values. The + // minimum value needs to be used as a value lower than any other value for + // comparisons and another separate value is needed to act as a sentinel + // default value that is unusable by the user, but usable internally. + // Because these two values need to be used for a special purpose, we do + // not allow users to write points at these two times. + MinNanoTime = int64(math.MinInt64) + 2 // MaxNanoTime is the maximum time that can be represented. // // 2262-04-11 23:47:16.854775806 +0000 UTC // + // The highest time represented by a nanosecond needs to be used for an + // exclusive range in the shard group, so the maximum time needs to be one + // less than the possible maximum number of nanoseconds representable by an + // int64 so that we don't lose a point at that one time. MaxNanoTime = int64(math.MaxInt64) - 1 ) diff --git a/tsdb/cursor.go b/tsdb/cursor.go index cbdf8cfa86..bfd4c7b88f 100644 --- a/tsdb/cursor.go +++ b/tsdb/cursor.go @@ -1,7 +1,9 @@ package tsdb +import "github.com/influxdata/influxdb/influxql" + // EOF represents a "not found" key returned by a Cursor. -const EOF = int64(-1) +const EOF = influxql.ZeroTime // Cursor represents an iterator over a series. type Cursor interface { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index ccc0866839..e0a8d40823 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -677,7 +677,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location { // skip it. if ascending && maxTime < t { continue - // If we are descending and the min time fo the file is after where we want to start, + // If we are descending and the min time of the file is after where we want to start, // then skip it. } else if !ascending && minTime > t { continue diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index ddeca7df12..1f403be063 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -612,7 +612,7 @@ func (d *indirectIndex) Entries(key string) []IndexEntry { } // The search may have returned an i == 0 which could indicated that the value - // searched should be inserted at postion 0. Make sure the key in the index + // searched should be inserted at position 0. Make sure the key in the index // matches the search value. if !bytes.Equal(kb, k) { return nil