Support negative timestamps in the query engine

Negative timestamps are now supported. We also now refuse two
nanoseconds that are at the edge of the minimum time window. One of the
nanoseconds we do not accept is because we need MinInt64 to be used for
some internal comparisons in the TSM engine and it was causing an
underflow when we subtracted one from the minimum time. The second is so
we can have one minimum time that signifies the default minimum that
nobody can write to (so we can implicitly rewrite the timestamp on
aggregate queries) but still use the explicit timestamp if it is given
to us by the user. We aren't able to tell the difference between if the
user provided it or if it was implicit without those values being
different.

If the default minimum time is used with an aggregate query, we rewrite
the time to be the epoch for backwards compatibility since we believe
that's more important than supporting that extra nanosecond.
pull/7212/head
Jonathan A. Sternberg 2016-08-25 12:52:39 -05:00
parent 5130cd703b
commit 10029caf2f
12 changed files with 65 additions and 14 deletions

View File

@ -51,6 +51,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#7605](https://github.com/influxdata/influxdb/issues/7605): Remove IF EXISTS/IF NOT EXISTS from influxql language.
- [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting.
- [#7199](https://github.com/influxdata/influxdb/pull/7199): Add mode function. Thanks @agaurav.
- [#7194](https://github.com/influxdata/influxdb/issues/7194): Support negative timestamps for the query engine.
### Bugfixes

View File

@ -528,7 +528,7 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx
}
}
if opt.MinTime.IsZero() {
opt.MinTime = time.Unix(0, 0)
opt.MinTime = time.Unix(0, influxql.MinTime).UTC()
}
// Convert DISTINCT into a call.

View File

@ -3630,7 +3630,7 @@ func TimeRange(expr Expr) (min, max time.Time, err error) {
}
// TimeRangeAsEpochNano returns the minimum and maximum times, as epoch nano, specified by
// and expression. If there is no lower bound, the start of the epoch is returned
// an expression. If there is no lower bound, the minimum time is returned
// for minimum. If there is no higher bound, now is returned for maximum.
func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) {
tmin, tmax, err := TimeRange(expr)
@ -3639,7 +3639,7 @@ func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) {
}
if tmin.IsZero() {
min = time.Unix(0, 0).UnixNano()
min = time.Unix(0, MinTime).UnixNano()
} else {
min = tmin.UnixNano()
}
@ -3680,7 +3680,9 @@ func timeExprValue(ref Expr, lit Expr) (t time.Time, err error) {
case *TimeLiteral:
if lit.Val.After(time.Unix(0, MaxTime)) {
return time.Time{}, fmt.Errorf("time %s overflows time literal", lit.Val.Format(time.RFC3339))
} else if lit.Val.Before(time.Unix(0, MinTime)) {
} else if lit.Val.Before(time.Unix(0, MinTime+1)) {
// The minimum allowable time literal is one greater than the minimum time because the minimum time
// is a sentinel value only used internally.
return time.Time{}, fmt.Errorf("time %s underflows time literal", lit.Val.Format(time.RFC3339))
}
return lit.Val, nil

View File

@ -816,6 +816,11 @@ func (itr *floatIntervalIterator) Next() (*FloatPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}
@ -2977,6 +2982,11 @@ func (itr *integerIntervalIterator) Next() (*IntegerPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}
@ -5135,6 +5145,11 @@ func (itr *stringIntervalIterator) Next() (*StringPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}
@ -7293,6 +7308,11 @@ func (itr *booleanIntervalIterator) Next() (*BooleanPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

View File

@ -815,6 +815,11 @@ func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

View File

@ -19,7 +19,10 @@ var ErrUnknownCall = errors.New("unknown call")
const (
// MinTime is used as the minimum time value when computing an unbounded range.
MinTime = int64(0)
// This time is one less than the MinNanoTime so that the first minimum
// time can be used as a sentinel value to signify that it is the default
// value rather than explicitly set by the user.
MinTime = models.MinNanoTime - 1
// MaxTime is used as the maximum time value when computing an unbounded range.
// This time is 2262-04-11 23:47:16.854775806 +0000 UTC
@ -925,7 +928,13 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) {
t -= int64(opt.Interval.Offset)
// Truncate time by duration.
t -= t % int64(opt.Interval.Duration)
dt := t % int64(opt.Interval.Duration)
if dt < 0 {
// Negative modulo rounds up instead of down, so offset
// with the duration.
dt += int64(opt.Interval.Duration)
}
t -= dt
// Apply the offset.
start = t + int64(opt.Interval.Offset)

View File

@ -5,14 +5,16 @@ import (
"encoding/binary"
"fmt"
"io"
"math"
"sort"
"github.com/gogo/protobuf/proto"
internal "github.com/influxdata/influxdb/influxql/internal"
)
// ZeroTime is the Unix nanosecond timestamp for time.Time{}.
const ZeroTime = int64(-6795364578871345152)
// ZeroTime is the Unix nanosecond timestamp for no time.
// This time is not used by the query engine or the storage engine as a valid time.
const ZeroTime = int64(math.MinInt64)
// Point represents a value in a series that occurred at a given time.
type Point interface {

View File

@ -1225,7 +1225,7 @@ func TestParsePointMaxTimestamp(t *testing.T) {
}
func TestParsePointMinTimestamp(t *testing.T) {
test(t, `cpu value=1 -9223372036854775808`,
test(t, `cpu value=1 -9223372036854775806`,
NewTestPoint(
"cpu",
models.Tags{},

View File

@ -12,14 +12,24 @@ import (
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224192 +0000 UTC
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
MinNanoTime = int64(math.MinInt64)
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2
// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)

View File

@ -1,7 +1,9 @@
package tsdb
import "github.com/influxdata/influxdb/influxql"
// EOF represents a "not found" key returned by a Cursor.
const EOF = int64(-1)
const EOF = influxql.ZeroTime
// Cursor represents an iterator over a series.
type Cursor interface {

View File

@ -677,7 +677,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
// skip it.
if ascending && maxTime < t {
continue
// If we are descending and the min time fo the file is after where we want to start,
// If we are descending and the min time of the file is after where we want to start,
// then skip it.
} else if !ascending && minTime > t {
continue

View File

@ -612,7 +612,7 @@ func (d *indirectIndex) Entries(key string) []IndexEntry {
}
// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at postion 0. Make sure the key in the index
// searched should be inserted at position 0. Make sure the key in the index
// matches the search value.
if !bytes.Equal(kb, k) {
return nil