From 0ef94e0cf06f0a776e713ec3cb462df62dcbe9b2 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 18 Sep 2017 11:28:37 -0500 Subject: [PATCH] Add unsigned iterators for all types This allows unsigned data to be queried from the storage engine. Binary math is not yet implemented for unsigned types. --- influxql/ast.go | 11 +- query/call_iterator.go | 292 +++++++++++++ query/call_iterator_test.go | 238 ++++++++++ query/functions.go | 410 ++++++++++++++++++ query/iterator.gen.go | 16 +- query/iterator.gen.go.tmpl | 2 +- query/iterator.go | 43 ++ query/iterator_test.go | 174 +++++++- query/linear.go | 10 + query/point.go | 2 + query/select_test.go | 834 ++++++++++++++++++++++++++++++++++++ 11 files changed, 2026 insertions(+), 6 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 996ecb349c..fbe7b9b409 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1239,10 +1239,11 @@ func (s *SelectStatement) RewriteFields(m FieldMapper) (*SelectStatement, error) continue } - // All types that can expand wildcards support float and integer. + // All types that can expand wildcards support float, integer, and unsigned. supportedTypes := map[DataType]struct{}{ - Float: struct{}{}, - Integer: struct{}{}, + Float: {}, + Integer: {}, + Unsigned: {}, } // Add additional types for certain functions. @@ -1252,6 +1253,8 @@ func (s *SelectStatement) RewriteFields(m FieldMapper) (*SelectStatement, error) fallthrough case "min", "max": supportedTypes[Boolean] = struct{}{} + case "holt_winters", "holt_winters_with_fit": + delete(supportedTypes, Unsigned) } for _, ref := range fields { @@ -4103,6 +4106,8 @@ func EvalType(expr Expr, sources Sources, typmap TypeMapper) DataType { return Float case "count": return Integer + case "elapsed": + return Integer default: return EvalType(expr.Args[0], sources, typmap) } diff --git a/query/call_iterator.go b/query/call_iterator.go index d160e22f8f..0346b8c30f 100644 --- a/query/call_iterator.go +++ b/query/call_iterator.go @@ -74,6 +74,12 @@ func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) { + fn := NewUnsignedFuncIntegerReducer(UnsignedCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) + return fn, fn + } + return newUnsignedReduceIntegerIterator(input, opt, createFn), nil case StringIterator: createFn := func() (StringPointAggregator, IntegerPointEmitter) { fn := NewStringFuncIntegerReducer(StringCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) @@ -107,6 +113,14 @@ func IntegerCountReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) return ZeroTime, prev.Value + 1, nil } +// UnsignedCountReduce returns the count of points. +func UnsignedCountReduce(prev *IntegerPoint, curr *UnsignedPoint) (int64, int64, []interface{}) { + if prev == nil { + return ZeroTime, 1, nil + } + return ZeroTime, prev.Value + 1, nil +} + // StringCountReduce returns the count of points. func StringCountReduce(prev *IntegerPoint, curr *StringPoint) (int64, int64, []interface{}) { if prev == nil { @@ -138,6 +152,12 @@ func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedFuncReducer(UnsignedMinReduce, nil) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil case BooleanIterator: createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { fn := NewBooleanFuncReducer(BooleanMinReduce, nil) @@ -165,6 +185,14 @@ func IntegerMinReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { return prev.Time, prev.Value, prev.Aux } +// UnsignedMinReduce returns the minimum value between prev & curr. +func UnsignedMinReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { + if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { + return curr.Time, curr.Value, cloneAux(curr.Aux) + } + return prev.Time, prev.Value, prev.Aux +} + // BooleanMinReduce returns the minimum value between prev & curr. func BooleanMinReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) { if prev == nil || (curr.Value != prev.Value && !curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) { @@ -188,6 +216,12 @@ func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedFuncReducer(UnsignedMaxReduce, nil) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil case BooleanIterator: createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { fn := NewBooleanFuncReducer(BooleanMaxReduce, nil) @@ -215,6 +249,14 @@ func IntegerMaxReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { return prev.Time, prev.Value, prev.Aux } +// UnsignedMaxReduce returns the maximum value between prev & curr. +func UnsignedMaxReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { + if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { + return curr.Time, curr.Value, cloneAux(curr.Aux) + } + return prev.Time, prev.Value, prev.Aux +} + // BooleanMaxReduce returns the minimum value between prev & curr. func BooleanMaxReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) { if prev == nil || (curr.Value != prev.Value && curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) { @@ -238,6 +280,12 @@ func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedFuncReducer(UnsignedSumReduce, &UnsignedPoint{Value: 0, Time: ZeroTime}) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil default: return nil, fmt.Errorf("unsupported sum iterator type: %T", input) } @@ -259,6 +307,14 @@ func IntegerSumReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { return prev.Time, prev.Value + curr.Value, nil } +// UnsignedSumReduce returns the sum prev value & curr value. +func UnsignedSumReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { + if prev == nil { + return ZeroTime, curr.Value, nil + } + return prev.Time, prev.Value + curr.Value, nil +} + // newFirstIterator returns an iterator for operating on a first() call. func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { @@ -274,6 +330,12 @@ func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedFuncReducer(UnsignedFirstReduce, nil) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil case StringIterator: createFn := func() (StringPointAggregator, StringPointEmitter) { fn := NewStringFuncReducer(StringFirstReduce, nil) @@ -307,6 +369,14 @@ func IntegerFirstReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) return prev.Time, prev.Value, prev.Aux } +// UnsignedFirstReduce returns the first point sorted by time. +func UnsignedFirstReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { + if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { + return curr.Time, curr.Value, cloneAux(curr.Aux) + } + return prev.Time, prev.Value, prev.Aux +} + // StringFirstReduce returns the first point sorted by time. func StringFirstReduce(prev, curr *StringPoint) (int64, string, []interface{}) { if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { @@ -338,6 +408,12 @@ func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedFuncReducer(UnsignedLastReduce, nil) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil case StringIterator: createFn := func() (StringPointAggregator, StringPointEmitter) { fn := NewStringFuncReducer(StringLastReduce, nil) @@ -371,6 +447,14 @@ func IntegerLastReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { return prev.Time, prev.Value, prev.Aux } +// UnsignedLastReduce returns the last point sorted by time. +func UnsignedLastReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { + if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { + return curr.Time, curr.Value, cloneAux(curr.Aux) + } + return prev.Time, prev.Value, prev.Aux +} + // StringLastReduce returns the first point sorted by time. func StringLastReduce(prev, curr *StringPoint) (int64, string, []interface{}) { if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { @@ -402,6 +486,12 @@ func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedDistinctReducer() + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil case StringIterator: createFn := func() (StringPointAggregator, StringPointEmitter) { fn := NewStringDistinctReducer() @@ -434,6 +524,12 @@ func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceFloatIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { + fn := NewUnsignedMeanReducer() + return fn, fn + } + return newUnsignedReduceFloatIterator(input, opt, createFn), nil default: return nil, fmt.Errorf("unsupported mean iterator type: %T", input) } @@ -459,6 +555,12 @@ func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceFloatIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { + fn := NewUnsignedSliceFuncFloatReducer(UnsignedMedianReduceSlice) + return fn, fn + } + return newUnsignedReduceFloatIterator(input, opt, createFn), nil default: return nil, fmt.Errorf("unsupported median iterator type: %T", input) } @@ -500,6 +602,24 @@ func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint { return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}} } +// UnsignedMedianReduceSlice returns the median value within a window. +func UnsignedMedianReduceSlice(a []UnsignedPoint) []FloatPoint { + if len(a) == 1 { + return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}} + } + + // OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1. + + // Return the middle value from the points. + // If there are an even number of points then return the mean of the two middle points. + sort.Sort(unsignedPointsByValue(a)) + if len(a)%2 == 0 { + lo, hi := a[len(a)/2-1], a[(len(a)/2)] + return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}} + } + return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}} +} + // newModeIterator returns an iterator for operating on a mode() call. func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { @@ -515,6 +635,12 @@ func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedSliceFuncReducer(UnsignedModeReduceSlice) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil case StringIterator: createFn := func() (StringPointAggregator, StringPointEmitter) { fn := NewStringSliceFuncReducer(StringModeReduceSlice) @@ -599,6 +725,39 @@ func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint { return []IntegerPoint{{Time: ZeroTime, Value: mostMode}} } +// UnsignedModeReduceSlice returns the mode value within a window. +func UnsignedModeReduceSlice(a []UnsignedPoint) []UnsignedPoint { + if len(a) == 1 { + return a + } + sort.Sort(unsignedPointsByValue(a)) + + mostFreq := 0 + currFreq := 0 + currMode := a[0].Value + mostMode := a[0].Value + mostTime := a[0].Time + currTime := a[0].Time + + for _, p := range a { + if p.Value != currMode { + currFreq = 1 + currMode = p.Value + currTime = p.Time + continue + } + currFreq++ + if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { + continue + } + mostFreq = currFreq + mostMode = p.Value + mostTime = p.Time + } + + return []UnsignedPoint{{Time: ZeroTime, Value: mostMode}} +} + // StringModeReduceSlice returns the mode value within a window. func StringModeReduceSlice(a []StringPoint) []StringPoint { if len(a) == 1 { @@ -674,6 +833,12 @@ func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceFloatIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { + fn := NewUnsignedSliceFuncFloatReducer(UnsignedStddevReduceSlice) + return fn, fn + } + return newUnsignedReduceFloatIterator(input, opt, createFn), nil default: return nil, fmt.Errorf("unsupported stddev iterator type: %T", input) } @@ -737,6 +902,32 @@ func IntegerStddevReduceSlice(a []IntegerPoint) []FloatPoint { }} } +// UnsignedStddevReduceSlice returns the stddev value within a window. +func UnsignedStddevReduceSlice(a []UnsignedPoint) []FloatPoint { + // If there is only one point then return 0. + if len(a) < 2 { + return []FloatPoint{{Time: ZeroTime, Nil: true}} + } + + // Calculate the mean. + var mean float64 + var count int + for _, p := range a { + count++ + mean += (float64(p.Value) - mean) / float64(count) + } + + // Calculate the variance. + var variance float64 + for _, p := range a { + variance += math.Pow(float64(p.Value)-mean, 2) + } + return []FloatPoint{{ + Time: ZeroTime, + Value: math.Sqrt(variance / float64(count-1)), + }} +} + // newSpreadIterator returns an iterator for operating on a spread() call. func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { @@ -752,6 +943,12 @@ func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) { return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedSliceFuncReducer(UnsignedSpreadReduceSlice) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil default: return nil, fmt.Errorf("unsupported spread iterator type: %T", input) } @@ -783,6 +980,21 @@ func IntegerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint { return []IntegerPoint{{Time: ZeroTime, Value: max - min}} } +// UnsignedSpreadReduceSlice returns the spread value within a window. +func UnsignedSpreadReduceSlice(a []UnsignedPoint) []UnsignedPoint { + // Find min & max values. + min, max := a[0].Value, a[0].Value + for _, p := range a[1:] { + if p.Value < min { + min = p.Value + } + if p.Value > max { + max = p.Value + } + } + return []UnsignedPoint{{Time: ZeroTime, Value: max - min}} +} + func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) { switch input := input.(type) { case FloatIterator: @@ -801,6 +1013,14 @@ func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) ( itr := newIntegerReduceIntegerIterator(input, opt, createFn) itr.keepTags = keepTags return itr, nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedTopReducer(n) + return fn, fn + } + itr := newUnsignedReduceUnsignedIterator(input, opt, createFn) + itr.keepTags = keepTags + return itr, nil default: return nil, fmt.Errorf("unsupported top iterator type: %T", input) } @@ -824,6 +1044,14 @@ func newBottomIterator(input Iterator, opt IteratorOptions, n int, keepTags bool itr := newIntegerReduceIntegerIterator(input, opt, createFn) itr.keepTags = keepTags return itr, nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedBottomReducer(n) + return fn, fn + } + itr := newUnsignedReduceUnsignedIterator(input, opt, createFn) + itr.keepTags = keepTags + return itr, nil default: return nil, fmt.Errorf("unsupported bottom iterator type: %T", input) } @@ -846,6 +1074,13 @@ func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + unsignedPercentileReduceSlice := NewUnsignedPercentileReduceSliceFunc(percentile) + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedSliceFuncReducer(unsignedPercentileReduceSlice) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil default: return nil, fmt.Errorf("unsupported percentile iterator type: %T", input) } @@ -881,6 +1116,21 @@ func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceF } } +// NewUnsignedPercentileReduceSliceFunc returns the percentile value within a window. +func NewUnsignedPercentileReduceSliceFunc(percentile float64) UnsignedReduceSliceFunc { + return func(a []UnsignedPoint) []UnsignedPoint { + length := len(a) + i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1 + + if i < 0 || i >= length { + return nil + } + + sort.Sort(unsignedPointsByValue(a)) + return []UnsignedPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}} + } +} + // newDerivativeIterator returns an iterator for operating on a derivative() call. func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) { switch input := input.(type) { @@ -896,6 +1146,12 @@ func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interva return fn, fn } return newIntegerStreamFloatIterator(input, createFn, opt), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { + fn := NewUnsignedDerivativeReducer(interval, isNonNegative, opt.Ascending) + return fn, fn + } + return newUnsignedStreamFloatIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported derivative iterator type: %T", input) } @@ -916,6 +1172,12 @@ func newDifferenceIterator(input Iterator, opt IteratorOptions, isNonNegative bo return fn, fn } return newIntegerStreamIntegerIterator(input, createFn, opt), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedDifferenceReducer(isNonNegative) + return fn, fn + } + return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported difference iterator type: %T", input) } @@ -936,6 +1198,12 @@ func newElapsedIterator(input Iterator, opt IteratorOptions, interval Interval) return fn, fn } return newIntegerStreamIntegerIterator(input, createFn, opt), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) { + fn := NewUnsignedElapsedReducer(interval) + return fn, fn + } + return newUnsignedStreamIntegerIterator(input, createFn, opt), nil case BooleanIterator: createFn := func() (BooleanPointAggregator, IntegerPointEmitter) { fn := NewBooleanElapsedReducer(interval) @@ -968,6 +1236,12 @@ func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Itera return fn, fn } return newIntegerStreamFloatIterator(input, createFn, opt), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { + fn := NewUnsignedMovingAverageReducer(n) + return fn, fn + } + return newUnsignedStreamFloatIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported moving average iterator type: %T", input) } @@ -988,6 +1262,12 @@ func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, er return fn, fn } return newIntegerStreamIntegerIterator(input, createFn, opt), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedCumulativeSumReducer() + return fn, fn + } + return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input) } @@ -1033,6 +1313,12 @@ func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { + fn := NewUnsignedSampleReducer(size) + return fn, fn + } + return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil case StringIterator: createFn := func() (StringPointAggregator, StringPointEmitter) { fn := NewStringSampleReducer(size) @@ -1065,6 +1351,12 @@ func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) return fn, fn } return newIntegerStreamFloatIterator(input, createFn, opt), nil + case UnsignedIterator: + createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { + fn := NewUnsignedIntegralReducer(interval, opt) + return fn, fn + } + return newUnsignedStreamFloatIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported integral iterator type: %T", input) } diff --git a/query/call_iterator_test.go b/query/call_iterator_test.go index fc0e7dc4a1..da3b3ea2a8 100644 --- a/query/call_iterator_test.go +++ b/query/call_iterator_test.go @@ -79,6 +79,41 @@ func TestCallIterator_Count_Integer(t *testing.T) { } } +// Ensure that an unsigned iterator can be created for a count() call. +func TestCallIterator_Count_Unsigned(t *testing.T) { + itr, _ := query.NewCallIterator( + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Name: "cpu", Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Name: "cpu", Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Name: "cpu", Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + + {Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Name: "cpu", Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + {Name: "mem", Time: 23, Value: 10, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + query.IteratorOptions{ + Expr: MustParseExpr(`count("value")`), + Dimensions: []string{"host"}, + Interval: query.Interval{Duration: 5 * time.Nanosecond}, + Ordered: true, + Ascending: true, + }, + ) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + {&query.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}}, + {&query.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}}, + {&query.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + {&query.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + {&query.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + }); diff != "" { + t.Fatalf("unexpected points:\n%s", diff) + } +} + // Ensure that a string iterator can be created for a count() call. func TestCallIterator_Count_String(t *testing.T) { itr, _ := query.NewCallIterator( @@ -217,6 +252,40 @@ func TestCallIterator_Min_Integer(t *testing.T) { } } +// Ensure that a unsigned iterator can be created for a min() call. +func TestCallIterator_Min_Unsigned(t *testing.T) { + itr, _ := query.NewCallIterator( + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 4, Value: 12, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + query.IteratorOptions{ + Expr: MustParseExpr(`min("value")`), + Dimensions: []string{"host"}, + Interval: query.Interval{Duration: 5 * time.Nanosecond}, + Ordered: true, + Ascending: true, + }, + ) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + {&query.UnsignedPoint{Time: 1, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 4}}, + {&query.UnsignedPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + }); diff != "" { + t.Fatalf("unexpected points:\n%s", diff) + } +} + // Ensure that a boolean iterator can be created for a min() call. func TestCallIterator_Min_Boolean(t *testing.T) { itr, _ := query.NewCallIterator( @@ -316,6 +385,39 @@ func TestCallIterator_Max_Integer(t *testing.T) { } } +// Ensure that a unsigned iterator can be created for a max() call. +func TestCallIterator_Max_Unsigned(t *testing.T) { + itr, _ := query.NewCallIterator( + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + query.IteratorOptions{ + Expr: MustParseExpr(`max("value")`), + Dimensions: []string{"host"}, + Interval: query.Interval{Duration: 5 * time.Nanosecond}, + Ordered: true, + Ascending: true, + }, + ) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + {&query.UnsignedPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}}, + {&query.UnsignedPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + }); diff != "" { + t.Fatalf("unexpected points:\n%s", diff) + } +} + // Ensure that a boolean iterator can be created for a max() call. func TestCallIterator_Max_Boolean(t *testing.T) { itr, _ := query.NewCallIterator( @@ -415,6 +517,39 @@ func TestCallIterator_Sum_Integer(t *testing.T) { } } +// Ensure that an unsigned iterator can be created for a sum() call. +func TestCallIterator_Sum_Unsigned(t *testing.T) { + itr, _ := query.NewCallIterator( + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + query.IteratorOptions{ + Expr: MustParseExpr(`sum("value")`), + Dimensions: []string{"host"}, + Interval: query.Interval{Duration: 5 * time.Nanosecond}, + Ordered: true, + Ascending: true, + }, + ) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + {&query.UnsignedPoint{Time: 0, Value: 35, Tags: ParseTags("host=hostA"), Aggregated: 3}}, + {&query.UnsignedPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + }); diff != "" { + t.Fatalf("unexpected points:\n%s", diff) + } +} + // Ensure that a float iterator can be created for a first() call. func TestCallIterator_First_Float(t *testing.T) { itr, _ := query.NewCallIterator( @@ -481,6 +616,39 @@ func TestCallIterator_First_Integer(t *testing.T) { } } +// Ensure that an unsigned iterator can be created for a first() call. +func TestCallIterator_First_Unsigned(t *testing.T) { + itr, _ := query.NewCallIterator( + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + query.IteratorOptions{ + Expr: MustParseExpr(`first("value")`), + Dimensions: []string{"host"}, + Interval: query.Interval{Duration: 5 * time.Nanosecond}, + Ordered: true, + Ascending: true, + }, + ) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + {&query.UnsignedPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}}, + {&query.UnsignedPoint{Time: 6, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + }); diff != "" { + t.Fatalf("unexpected points:\n%s", diff) + } +} + // Ensure that a string iterator can be created for a first() call. func TestCallIterator_First_String(t *testing.T) { itr, _ := query.NewCallIterator( @@ -613,6 +781,39 @@ func TestCallIterator_Last_Integer(t *testing.T) { } } +// Ensure that an unsigned iterator can be created for a last() call. +func TestCallIterator_Last_Unsigned(t *testing.T) { + itr, _ := query.NewCallIterator( + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + query.IteratorOptions{ + Expr: MustParseExpr(`last("value")`), + Dimensions: []string{"host"}, + Interval: query.Interval{Duration: 5 * time.Nanosecond}, + Ordered: true, + Ascending: true, + }, + ) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + {&query.UnsignedPoint{Time: 2, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3}}, + {&query.UnsignedPoint{Time: 6, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + {&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}}, + }); diff != "" { + t.Fatalf("unexpected points:\n%s", diff) + } +} + // Ensure that a string iterator can be created for a last() call. func TestCallIterator_Last_String(t *testing.T) { itr, _ := query.NewCallIterator( @@ -754,6 +955,43 @@ func TestCallIterator_Mode_Integer(t *testing.T) { } } +// Ensure that a unsigned iterator can be created for a mode() call. +func TestCallIterator_Mode_Unsigned(t *testing.T) { + itr, _ := query.NewModeIterator(&UnsignedIterator{Points: []query.UnsignedPoint{ + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 3, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 4, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 7, Value: 21, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 8, Value: 21, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 22, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 24, Value: 25, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + query.IteratorOptions{ + Expr: MustParseExpr(`mode("value")`), + Dimensions: []string{"host"}, + Interval: query.Interval{Duration: 5 * time.Nanosecond}, + Ordered: true, + Ascending: true, + }, + ) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + {&query.UnsignedPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA")}}, + {&query.UnsignedPoint{Time: 5, Value: 21, Tags: ParseTags("host=hostA")}}, + {&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB")}}, + {&query.UnsignedPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}}, + }); diff != "" { + t.Fatalf("unexpected points:\n%s", diff) + } +} + // Ensure that a string iterator can be created for a mode() call. func TestCallIterator_Mode_String(t *testing.T) { itr, _ := query.NewModeIterator(&StringIterator{Points: []query.StringPoint{ diff --git a/query/functions.go b/query/functions.go index cf0186eca4..dfb412aa8d 100644 --- a/query/functions.go +++ b/query/functions.go @@ -72,6 +72,37 @@ func (r *IntegerMeanReducer) Emit() []FloatPoint { }} } +// UnsignedMeanReducer calculates the mean of the aggregated points. +type UnsignedMeanReducer struct { + sum uint64 + count uint32 +} + +// NewUnsignedMeanReducer creates a new UnsignedMeanReducer. +func NewUnsignedMeanReducer() *UnsignedMeanReducer { + return &UnsignedMeanReducer{} +} + +// AggregateUnsigned aggregates a point into the reducer. +func (r *UnsignedMeanReducer) AggregateUnsigned(p *UnsignedPoint) { + if p.Aggregated >= 2 { + r.sum += p.Value * uint64(p.Aggregated) + r.count += p.Aggregated + } else { + r.sum += p.Value + r.count++ + } +} + +// Emit emits the mean of the aggregated points as a single point. +func (r *UnsignedMeanReducer) Emit() []FloatPoint { + return []FloatPoint{{ + Time: ZeroTime, + Value: float64(r.sum) / float64(r.count), + Aggregated: r.count, + }} +} + // FloatDerivativeReducer calculates the derivative of the aggregated points. type FloatDerivativeReducer struct { interval Interval @@ -186,6 +217,68 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint { return nil } +// UnsignedDerivativeReducer calculates the derivative of the aggregated points. +type UnsignedDerivativeReducer struct { + interval Interval + prev UnsignedPoint + curr UnsignedPoint + isNonNegative bool + ascending bool +} + +// NewUnsignedDerivativeReducer creates a new UnsignedDerivativeReducer. +func NewUnsignedDerivativeReducer(interval Interval, isNonNegative, ascending bool) *UnsignedDerivativeReducer { + return &UnsignedDerivativeReducer{ + interval: interval, + isNonNegative: isNonNegative, + ascending: ascending, + prev: UnsignedPoint{Nil: true}, + curr: UnsignedPoint{Nil: true}, + } +} + +// AggregateUnsigned aggregates a point into the reducer and updates the current window. +func (r *UnsignedDerivativeReducer) AggregateUnsigned(p *UnsignedPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + + r.prev = r.curr + r.curr = *p +} + +// Emit emits the derivative of the reducer at the current point. +func (r *UnsignedDerivativeReducer) Emit() []FloatPoint { + if !r.prev.Nil { + // Calculate the derivative of successive points by dividing the + // difference of each value by the elapsed time normalized to the interval. + var diff float64 + if r.curr.Value > r.prev.Value { + diff = float64(r.curr.Value - r.prev.Value) + } else { + diff = -float64(r.prev.Value - r.curr.Value) + } + elapsed := r.curr.Time - r.prev.Time + if !r.ascending { + elapsed = -elapsed + } + value := diff / (float64(elapsed) / float64(r.interval.Duration)) + + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true + + // Drop negative values for non-negative derivatives. + if r.isNonNegative && diff < 0 { + return nil + } + return []FloatPoint{{Time: r.curr.Time, Value: value}} + } + return nil +} + // FloatDifferenceReducer calculates the derivative of the aggregated points. type FloatDifferenceReducer struct { isNonNegative bool @@ -283,6 +376,55 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint { return nil } +// UnsignedDifferenceReducer calculates the derivative of the aggregated points. +type UnsignedDifferenceReducer struct { + isNonNegative bool + prev UnsignedPoint + curr UnsignedPoint +} + +// NewUnsignedDifferenceReducer creates a new UnsignedDifferenceReducer. +func NewUnsignedDifferenceReducer(isNonNegative bool) *UnsignedDifferenceReducer { + return &UnsignedDifferenceReducer{ + isNonNegative: isNonNegative, + prev: UnsignedPoint{Nil: true}, + curr: UnsignedPoint{Nil: true}, + } +} + +// AggregateUnsigned aggregates a point into the reducer and updates the current window. +func (r *UnsignedDifferenceReducer) AggregateUnsigned(p *UnsignedPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + + r.prev = r.curr + r.curr = *p +} + +// Emit emits the difference of the reducer at the current point. +func (r *UnsignedDifferenceReducer) Emit() []UnsignedPoint { + if !r.prev.Nil { + // If it is non_negative_difference discard any negative value. Since + // prev is still marked as unread. The correctness can be ensured. + if r.isNonNegative && r.curr.Value < r.prev.Value { + return nil + } + + // Calculate the difference of successive points. + value := r.curr.Value - r.prev.Value + + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true + + return []UnsignedPoint{{Time: r.curr.Time, Value: value}} + } + return nil +} + // FloatMovingAverageReducer calculates the moving average of the aggregated points. type FloatMovingAverageReducer struct { pos int @@ -377,6 +519,53 @@ func (r *IntegerMovingAverageReducer) Emit() []FloatPoint { } } +// UnsignedMovingAverageReducer calculates the moving average of the aggregated points. +type UnsignedMovingAverageReducer struct { + pos int + sum uint64 + time int64 + buf []uint64 +} + +// NewUnsignedMovingAverageReducer creates a new UnsignedMovingAverageReducer. +func NewUnsignedMovingAverageReducer(n int) *UnsignedMovingAverageReducer { + return &UnsignedMovingAverageReducer{ + buf: make([]uint64, 0, n), + } +} + +// AggregateUnsigned aggregates a point into the reducer and updates the current window. +func (r *UnsignedMovingAverageReducer) AggregateUnsigned(p *UnsignedPoint) { + if len(r.buf) != cap(r.buf) { + r.buf = append(r.buf, p.Value) + } else { + r.sum -= r.buf[r.pos] + r.buf[r.pos] = p.Value + } + r.sum += p.Value + r.time = p.Time + r.pos++ + if r.pos >= cap(r.buf) { + r.pos = 0 + } +} + +// Emit emits the moving average of the current window. Emit should be called +// after every call to AggregateUnsigned and it will produce one point if there +// is enough data to fill a window, otherwise it will produce zero points. +func (r *UnsignedMovingAverageReducer) Emit() []FloatPoint { + if len(r.buf) != cap(r.buf) { + return []FloatPoint{} + } + return []FloatPoint{ + { + Value: float64(r.sum) / float64(len(r.buf)), + Time: r.time, + Aggregated: uint32(len(r.buf)), + }, + } +} + // FloatCumulativeSumReducer cumulates the values from each point. type FloatCumulativeSumReducer struct { curr FloatPoint @@ -429,6 +618,32 @@ func (r *IntegerCumulativeSumReducer) Emit() []IntegerPoint { return pts } +// UnsignedCumulativeSumReducer cumulates the values from each point. +type UnsignedCumulativeSumReducer struct { + curr UnsignedPoint +} + +// NewUnsignedCumulativeSumReducer creates a new UnsignedCumulativeSumReducer. +func NewUnsignedCumulativeSumReducer() *UnsignedCumulativeSumReducer { + return &UnsignedCumulativeSumReducer{ + curr: UnsignedPoint{Nil: true}, + } +} + +func (r *UnsignedCumulativeSumReducer) AggregateUnsigned(p *UnsignedPoint) { + r.curr.Value += p.Value + r.curr.Time = p.Time + r.curr.Nil = false +} + +func (r *UnsignedCumulativeSumReducer) Emit() []UnsignedPoint { + var pts []UnsignedPoint + if !r.curr.Nil { + pts = []UnsignedPoint{r.curr} + } + return pts +} + // FloatHoltWintersReducer forecasts a series into the future. // This is done using the Holt-Winters damped method. // 1. Using the series the initial values are calculated using a SSE. @@ -991,6 +1206,115 @@ func (r *IntegerIntegralReducer) Close() error { return nil } +// IntegerIntegralReducer calculates the time-integral of the aggregated points. +type UnsignedIntegralReducer struct { + interval Interval + sum float64 + prev UnsignedPoint + window struct { + start int64 + end int64 + } + ch chan FloatPoint + opt IteratorOptions +} + +// NewUnsignedIntegralReducer creates a new UnsignedIntegralReducer. +func NewUnsignedIntegralReducer(interval Interval, opt IteratorOptions) *UnsignedIntegralReducer { + return &UnsignedIntegralReducer{ + interval: interval, + prev: UnsignedPoint{Nil: true}, + ch: make(chan FloatPoint, 1), + opt: opt, + } +} + +// AggregateUnsigned aggregates a point into the reducer. +func (r *UnsignedIntegralReducer) AggregateUnsigned(p *UnsignedPoint) { + // If this is the first point, just save it + if r.prev.Nil { + r.prev = *p + + // Record the end of the time interval. + // We do not care for whether the last number is inclusive or exclusive + // because we treat both the same for the involved math. + if r.opt.Ascending { + r.window.start, r.window.end = r.opt.Window(p.Time) + } else { + r.window.end, r.window.start = r.opt.Window(p.Time) + } + + // If we see the minimum allowable time, set the time to zero so we don't + // break the default returned time for aggregate queries without times. + if r.window.start == influxql.MinTime { + r.window.start = 0 + } + return + } + + // If this point has the same timestamp as the previous one, + // skip the point. Points sent into this reducer are expected + // to be fed in order. + value := float64(p.Value) + if r.prev.Time == p.Time { + r.prev = *p + return + } else if (r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end) { + // If our previous time is not equal to the window, we need to + // interpolate the area at the end of this interval. + if r.prev.Time != r.window.end { + value = linearFloat(r.window.end, r.prev.Time, p.Time, float64(r.prev.Value), value) + elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration) + r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed + + r.prev.Time = r.window.end + } + + // Emit the current point through the channel and then clear it. + r.ch <- FloatPoint{Time: r.window.start, Value: r.sum} + if r.opt.Ascending { + r.window.start, r.window.end = r.opt.Window(p.Time) + } else { + r.window.end, r.window.start = r.opt.Window(p.Time) + } + r.sum = 0.0 + } + + // Normal operation: update the sum using the trapezium rule + elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration) + r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed + r.prev = *p +} + +// Emit emits the time-integral of the aggregated points as a single FLOAT point +// InfluxQL convention dictates that outside a group-by-time clause we return +// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime +// and a higher level will change it to the start of the time group. +func (r *UnsignedIntegralReducer) Emit() []FloatPoint { + select { + case pt, ok := <-r.ch: + if !ok { + return nil + } + return []FloatPoint{pt} + default: + return nil + } +} + +// Close flushes any in progress points to ensure any remaining points are +// emitted. +func (r *UnsignedIntegralReducer) Close() error { + // If our last point is at the start time, then discard this point since + // there is no area within this bucket. Otherwise, send off what we + // currently have as the final point. + if !r.prev.Nil && r.prev.Time != r.window.start { + r.ch <- FloatPoint{Time: r.window.start, Value: r.sum} + } + close(r.ch) + return nil +} + type FloatTopReducer struct { h *floatPointsByFunc } @@ -1077,6 +1401,49 @@ func (r *IntegerTopReducer) Emit() []IntegerPoint { return points } +type UnsignedTopReducer struct { + h *unsignedPointsByFunc +} + +func NewUnsignedTopReducer(n int) *UnsignedTopReducer { + return &UnsignedTopReducer{ + h: unsignedPointsSortBy(make([]UnsignedPoint, 0, n), func(a, b *UnsignedPoint) bool { + if a.Value != b.Value { + return a.Value < b.Value + } + return a.Time > b.Time + }), + } +} + +func (r *UnsignedTopReducer) AggregateUnsigned(p *UnsignedPoint) { + if r.h.Len() == cap(r.h.points) { + // Compare the minimum point and the aggregated point. If our value is + // larger, replace the current min value. + if !r.h.cmp(&r.h.points[0], p) { + return + } + r.h.points[0] = *p + heap.Fix(r.h, 0) + return + } + heap.Push(r.h, *p) +} + +func (r *UnsignedTopReducer) Emit() []UnsignedPoint { + // Ensure the points are sorted with the maximum value last. While the + // first point may be the minimum value, the rest is not guaranteed to be + // in any particular order while it is a heap. + points := make([]UnsignedPoint, len(r.h.points)) + for i, p := range r.h.points { + p.Aggregated = 0 + points[i] = p + } + h := unsignedPointsByFunc{points: points, cmp: r.h.cmp} + sort.Sort(sort.Reverse(&h)) + return points +} + type FloatBottomReducer struct { h *floatPointsByFunc } @@ -1162,3 +1529,46 @@ func (r *IntegerBottomReducer) Emit() []IntegerPoint { sort.Sort(sort.Reverse(&h)) return points } + +type UnsignedBottomReducer struct { + h *unsignedPointsByFunc +} + +func NewUnsignedBottomReducer(n int) *UnsignedBottomReducer { + return &UnsignedBottomReducer{ + h: unsignedPointsSortBy(make([]UnsignedPoint, 0, n), func(a, b *UnsignedPoint) bool { + if a.Value != b.Value { + return a.Value > b.Value + } + return a.Time > b.Time + }), + } +} + +func (r *UnsignedBottomReducer) AggregateUnsigned(p *UnsignedPoint) { + if r.h.Len() == cap(r.h.points) { + // Compare the minimum point and the aggregated point. If our value is + // larger, replace the current min value. + if !r.h.cmp(&r.h.points[0], p) { + return + } + r.h.points[0] = *p + heap.Fix(r.h, 0) + return + } + heap.Push(r.h, *p) +} + +func (r *UnsignedBottomReducer) Emit() []UnsignedPoint { + // Ensure the points are sorted with the maximum value last. While the + // first point may be the minimum value, the rest is not guaranteed to be + // in any particular order while it is a heap. + points := make([]UnsignedPoint, len(r.h.points)) + for i, p := range r.h.points { + p.Aggregated = 0 + points[i] = p + } + h := unsignedPointsByFunc{points: points, cmp: r.h.cmp} + sort.Sort(sort.Reverse(&h)) + return points +} diff --git a/query/iterator.gen.go b/query/iterator.gen.go index b779fde594..c927d98cef 100644 --- a/query/iterator.gen.go +++ b/query/iterator.gen.go @@ -7501,7 +7501,21 @@ func (itr *unsignedFillIterator) Next() (*UnsignedPoint, error) { switch itr.opt.Fill { case influxql.LinearFill: - fallthrough + if !itr.prev.Nil { + next, err := itr.input.peek() + if err != nil { + return nil, err + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { + interval := int64(itr.opt.Interval.Duration) + start := itr.window.time / interval + p.Value = linearUnsigned(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) + } else { + p.Nil = true + } + } else { + p.Nil = true + } + case influxql.NullFill: p.Nil = true case influxql.NumberFill: diff --git a/query/iterator.gen.go.tmpl b/query/iterator.gen.go.tmpl index 7e9d7d47a2..a2bba68d14 100644 --- a/query/iterator.gen.go.tmpl +++ b/query/iterator.gen.go.tmpl @@ -722,7 +722,7 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { switch itr.opt.Fill { case influxql.LinearFill: - {{- if or (eq $k.Name "Float") (eq $k.Name "Integer")}} + {{- if or (eq $k.Name "Float") (eq $k.Name "Integer") (eq $k.Name "Unsigned")}} if !itr.prev.Nil { next, err := itr.input.peek() if err != nil { diff --git a/query/iterator.go b/query/iterator.go index 494926c475..fd68e35954 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -74,6 +74,8 @@ func (a Iterators) dataType() influxql.DataType { return influxql.Float case IntegerIterator: return influxql.Integer + case UnsignedIterator: + return influxql.Unsigned case StringIterator: return influxql.String case BooleanIterator: @@ -93,6 +95,8 @@ func (a Iterators) coerce() interface{} { return newFloatIterators(a) case influxql.Integer: return newIntegerIterators(a) + case influxql.Unsigned: + return newUnsignedIterators(a) case influxql.String: return newStringIterators(a) case influxql.Boolean: @@ -161,6 +165,8 @@ func NewMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator { return newFloatMergeIterator(inputs, opt) case []IntegerIterator: return newIntegerMergeIterator(inputs, opt) + case []UnsignedIterator: + return newUnsignedMergeIterator(inputs, opt) case []StringIterator: return newStringMergeIterator(inputs, opt) case []BooleanIterator: @@ -223,6 +229,8 @@ func NewSortedMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator { return newFloatSortedMergeIterator(inputs, opt) case []IntegerIterator: return newIntegerSortedMergeIterator(inputs, opt) + case []UnsignedIterator: + return newUnsignedSortedMergeIterator(inputs, opt) case []StringIterator: return newStringSortedMergeIterator(inputs, opt) case []BooleanIterator: @@ -243,6 +251,8 @@ func newParallelIterator(input Iterator) Iterator { return newFloatParallelIterator(itr) case IntegerIterator: return newIntegerParallelIterator(itr) + case UnsignedIterator: + return newUnsignedParallelIterator(itr) case StringIterator: return newStringParallelIterator(itr) case BooleanIterator: @@ -259,6 +269,8 @@ func NewLimitIterator(input Iterator, opt IteratorOptions) Iterator { return newFloatLimitIterator(input, opt) case IntegerIterator: return newIntegerLimitIterator(input, opt) + case UnsignedIterator: + return newUnsignedLimitIterator(input, opt) case StringIterator: return newStringLimitIterator(input, opt) case BooleanIterator: @@ -281,6 +293,8 @@ func NewFilterIterator(input Iterator, cond influxql.Expr, opt IteratorOptions) return newFloatFilterIterator(input, cond, opt) case IntegerIterator: return newIntegerFilterIterator(input, cond, opt) + case UnsignedIterator: + return newUnsignedFilterIterator(input, cond, opt) case StringIterator: return newStringFilterIterator(input, cond, opt) case BooleanIterator: @@ -303,6 +317,8 @@ func NewDedupeIterator(input Iterator) Iterator { return newFloatDedupeIterator(input) case IntegerIterator: return newIntegerDedupeIterator(input) + case UnsignedIterator: + return newUnsignedDedupeIterator(input) case StringIterator: return newStringDedupeIterator(input) case BooleanIterator: @@ -319,6 +335,8 @@ func NewFillIterator(input Iterator, expr influxql.Expr, opt IteratorOptions) It return newFloatFillIterator(input, expr, opt) case IntegerIterator: return newIntegerFillIterator(input, expr, opt) + case UnsignedIterator: + return newUnsignedFillIterator(input, expr, opt) case StringIterator: return newStringFillIterator(input, expr, opt) case BooleanIterator: @@ -335,6 +353,8 @@ func NewIntervalIterator(input Iterator, opt IteratorOptions) Iterator { return newFloatIntervalIterator(input, opt) case IntegerIterator: return newIntegerIntervalIterator(input, opt) + case UnsignedIterator: + return newUnsignedIntervalIterator(input, opt) case StringIterator: return newStringIntervalIterator(input, opt) case BooleanIterator: @@ -352,6 +372,8 @@ func NewInterruptIterator(input Iterator, closing <-chan struct{}) Iterator { return newFloatInterruptIterator(input, closing) case IntegerIterator: return newIntegerInterruptIterator(input, closing) + case UnsignedIterator: + return newUnsignedInterruptIterator(input, closing) case StringIterator: return newStringInterruptIterator(input, closing) case BooleanIterator: @@ -369,6 +391,8 @@ func NewCloseInterruptIterator(input Iterator, closing <-chan struct{}) Iterator return newFloatCloseInterruptIterator(input, closing) case IntegerIterator: return newIntegerCloseInterruptIterator(input, closing) + case UnsignedIterator: + return newUnsignedCloseInterruptIterator(input, closing) case StringIterator: return newStringCloseInterruptIterator(input, closing) case BooleanIterator: @@ -400,6 +424,8 @@ func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator { return newFloatAuxIterator(input, opt) case IntegerIterator: return newIntegerAuxIterator(input, opt) + case UnsignedIterator: + return newUnsignedAuxIterator(input, opt) case StringIterator: return newStringAuxIterator(input, opt) case BooleanIterator: @@ -474,6 +500,10 @@ func (a *auxIteratorFields) iterator(name string, typ influxql.DataType) Iterato itr := &integerChanIterator{cond: sync.NewCond(&sync.Mutex{})} f.append(itr) return itr + case influxql.Unsigned: + itr := &unsignedChanIterator{cond: sync.NewCond(&sync.Mutex{})} + f.append(itr) + return itr case influxql.String, influxql.Tag: itr := &stringChanIterator{cond: sync.NewCond(&sync.Mutex{})} f.append(itr) @@ -510,6 +540,8 @@ func (a *auxIteratorFields) send(p Point) (ok bool) { ok = itr.setBuf(p.name(), tags, p.time(), v) || ok case *integerChanIterator: ok = itr.setBuf(p.name(), tags, p.time(), v) || ok + case *unsignedChanIterator: + ok = itr.setBuf(p.name(), tags, p.time(), v) || ok case *stringChanIterator: ok = itr.setBuf(p.name(), tags, p.time(), v) || ok case *booleanChanIterator: @@ -530,6 +562,8 @@ func (a *auxIteratorFields) sendError(err error) { itr.setErr(err) case *integerChanIterator: itr.setErr(err) + case *unsignedChanIterator: + itr.setErr(err) case *stringChanIterator: itr.setErr(err) case *booleanChanIterator: @@ -551,6 +585,9 @@ func DrainIterator(itr Iterator) { case IntegerIterator: for p, _ := itr.Next(); p != nil; p, _ = itr.Next() { } + case UnsignedIterator: + for p, _ := itr.Next(); p != nil; p, _ = itr.Next() { + } case StringIterator: for p, _ := itr.Next(); p != nil; p, _ = itr.Next() { } @@ -578,6 +615,10 @@ func DrainIterators(itrs []Iterator) { if p, _ := itr.Next(); p != nil { hasData = true } + case UnsignedIterator: + if p, _ := itr.Next(); p != nil { + hasData = true + } case StringIterator: if p, _ := itr.Next(); p != nil { hasData = true @@ -605,6 +646,8 @@ func NewReaderIterator(r io.Reader, typ influxql.DataType, stats IteratorStats) return newFloatReaderIterator(r, stats) case influxql.Integer: return newIntegerReaderIterator(r, stats) + case influxql.Unsigned: + return newUnsignedReaderIterator(r, stats) case influxql.String: return newStringReaderIterator(r, stats) case influxql.Boolean: diff --git a/query/iterator_test.go b/query/iterator_test.go index 151c750bda..97bc76a79e 100644 --- a/query/iterator_test.go +++ b/query/iterator_test.go @@ -114,6 +114,55 @@ func TestMergeIterator_Integer(t *testing.T) { } } +// Ensure that a set of iterators can be merged together, sorted by window and name/tag. +func TestMergeIterator_Unsigned(t *testing.T) { + inputs := []*UnsignedIterator{ + {Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, + {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, + {Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}, + }}, + {Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, + {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, + }}, + {Points: []query.UnsignedPoint{}}, + } + itr := query.NewMergeIterator(UnsignedIterators(inputs), query.IteratorOptions{ + Interval: query.Interval{ + Duration: 10 * time.Nanosecond, + }, + Dimensions: []string{"host"}, + Ascending: true, + }) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, + {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, + {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}}, + }) { + t.Errorf("unexpected points: %s", spew.Sdump(a)) + } + + for i, input := range inputs { + if !input.Closed { + t.Errorf("iterator %d not closed", i) + } + } +} + // Ensure that a set of iterators can be merged together, sorted by window and name/tag. func TestMergeIterator_String(t *testing.T) { inputs := []*StringIterator{ @@ -219,6 +268,9 @@ func TestMergeIterator_Nil(t *testing.T) { } } +// Verifies that coercing will drop values that aren't the primary type. +// It's the responsibility of the engine to return the correct type. If they don't, +// we drop iterators that don't match. func TestMergeIterator_Coerce_Float(t *testing.T) { inputs := []query.Iterator{ &FloatIterator{Points: []query.FloatPoint{ @@ -264,6 +316,10 @@ func TestMergeIterator_Coerce_Float(t *testing.T) { if !input.Closed { t.Errorf("iterator %d not closed", i) } + case *UnsignedIterator: + if !input.Closed { + t.Errorf("iterator %d not closed", i) + } } } } @@ -364,6 +420,54 @@ func TestSortedMergeIterator_Integer(t *testing.T) { } } +// Ensure that a set of iterators can be merged together, sorted by name/tag. +func TestSortedMergeIterator_Unsigned(t *testing.T) { + inputs := []*UnsignedIterator{ + {Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}, + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}, + {Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}, + {Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}, + }}, + {Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}, + {Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}, + }}, + {Points: []query.UnsignedPoint{}}, + } + itr := query.NewSortedMergeIterator(UnsignedIterators(inputs), query.IteratorOptions{ + Interval: query.Interval{ + Duration: 10 * time.Nanosecond, + }, + Dimensions: []string{"host"}, + Ascending: true, + }) + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}}, + {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}}, + {&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}}, + }) { + t.Errorf("unexpected points: %s", spew.Sdump(a)) + } + + for i, input := range inputs { + if !input.Closed { + t.Errorf("iterator %d not closed", i) + } + } +} + // Ensure that a set of iterators can be merged together, sorted by name/tag. func TestSortedMergeIterator_String(t *testing.T) { inputs := []*StringIterator{ @@ -512,6 +616,10 @@ func TestSortedMergeIterator_Coerce_Float(t *testing.T) { if !input.Closed { t.Errorf("iterator %d not closed", i) } + case *UnsignedIterator: + if !input.Closed { + t.Errorf("iterator %d not closed", i) + } } } } @@ -574,6 +682,35 @@ func TestLimitIterator_Integer(t *testing.T) { } } +// Ensure limit iterators work with limit and offset. +func TestLimitIterator_Unsigned(t *testing.T) { + input := &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0, Value: 1}, + {Name: "cpu", Time: 5, Value: 3}, + {Name: "cpu", Time: 10, Value: 5}, + {Name: "mem", Time: 5, Value: 3}, + {Name: "mem", Time: 7, Value: 8}, + }} + + itr := query.NewLimitIterator(input, query.IteratorOptions{ + Limit: 1, + Offset: 1, + }) + + if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Time: 5, Value: 3}}, + {&query.UnsignedPoint{Name: "mem", Time: 7, Value: 8}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } + + if !input.Closed { + t.Error("iterator not closed") + } +} + // Ensure limit iterators work with limit and offset. func TestLimitIterator_String(t *testing.T) { input := &StringIterator{Points: []query.StringPoint{ @@ -632,7 +769,7 @@ func TestLimitIterator_Boolean(t *testing.T) { } } -// Ensure auxilary iterators can be created for auxilary fields. +// Ensure auxiliary iterators can be created for auxilary fields. func TestFloatAuxIterator(t *testing.T) { itr := query.NewAuxIterator( &FloatIterator{Points: []query.FloatPoint{ @@ -939,6 +1076,12 @@ func (itrs Iterators) Next() ([]query.Point, error) { return nil, err } a[i] = ip + case query.UnsignedIterator: + up, err := itr.Next() + if up == nil || err != nil { + return nil, err + } + a[i] = up case query.StringIterator: sp, err := itr.Next() if sp == nil || err != nil { @@ -1466,6 +1609,35 @@ func IntegerIterators(inputs []*IntegerIterator) []query.Iterator { return itrs } +// Test implementation of query.UnsignedIterator +type UnsignedIterator struct { + Points []query.UnsignedPoint + Closed bool + stats query.IteratorStats +} + +func (itr *UnsignedIterator) Stats() query.IteratorStats { return itr.stats } +func (itr *UnsignedIterator) Close() error { itr.Closed = true; return nil } + +// Next returns the next value and shifts it off the beginning of the points slice. +func (itr *UnsignedIterator) Next() (*query.UnsignedPoint, error) { + if len(itr.Points) == 0 || itr.Closed { + return nil, nil + } + + v := &itr.Points[0] + itr.Points = itr.Points[1:] + return v, nil +} + +func UnsignedIterators(inputs []*UnsignedIterator) []query.Iterator { + itrs := make([]query.Iterator, len(inputs)) + for i := range itrs { + itrs[i] = query.Iterator(inputs[i]) + } + return itrs +} + // Test implementation of query.StringIterator type StringIterator struct { Points []query.StringPoint diff --git a/query/linear.go b/query/linear.go index 22a2244e08..0da38f9815 100644 --- a/query/linear.go +++ b/query/linear.go @@ -19,3 +19,13 @@ func linearInteger(windowTime, previousTime, nextTime int64, previousValue, next b := float64(previousValue) return int64(m*x + b) } + +// linearInteger computes the the slope of the line between the points (previousTime, previousValue) and (nextTime, nextValue) +// and returns the value of the point on the line with time windowTime +// y = mx + b +func linearUnsigned(windowTime, previousTime, nextTime int64, previousValue, nextValue uint64) uint64 { + m := float64(nextValue-previousValue) / float64(nextTime-previousTime) // the slope of the line + x := float64(windowTime - previousTime) // how far into the interval we are + b := float64(previousValue) + return uint64(m*x + b) +} diff --git a/query/point.go b/query/point.go index 6325526cea..4592cfd22d 100644 --- a/query/point.go +++ b/query/point.go @@ -50,6 +50,8 @@ func (a Points) Clone() []Point { other[i] = p.Clone() case *IntegerPoint: other[i] = p.Clone() + case *UnsignedPoint: + other[i] = p.Clone() case *StringPoint: other[i] = p.Clone() case *BooleanPoint: diff --git a/query/select_test.go b/query/select_test.go index 733e43e4d7..a887b77657 100644 --- a/query/select_test.go +++ b/query/select_test.go @@ -104,6 +104,32 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10}}, }, }, + { + name: "Distinct_Unsigned", + q: `SELECT distinct(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 1 * Second, Value: 19}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 11 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 12 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 19}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10}}, + }, + }, { name: "Distinct_String", q: `SELECT distinct(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -221,6 +247,38 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 3.2, Aggregated: 5}}, }, }, + { + name: "Mean_Unsigned", + q: `SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + expr: `mean(value::Unsigned)`, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 19.5, Aggregated: 2}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2.5, Aggregated: 2}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 100, Aggregated: 1}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10, Aggregated: 1}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 3.2, Aggregated: 5}}, + }, + }, { name: "Mean_String", q: `SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -297,6 +355,37 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 3}}, }, }, + { + name: "Median_Unsigned", + q: `SELECT median(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 19.5}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2.5}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 100}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 3}}, + }, + }, { name: "Median_String", q: `SELECT median(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -373,6 +462,37 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 1}}, }, }, + { + name: "Mode_Unsigned", + q: `SELECT mode(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 54 * Second, Value: 5}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 10}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 100}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 1}}, + }, + }, { name: "Mode_String", q: `SELECT mode(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -492,6 +612,38 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 53 * Second, Value: 4}}, }, }, + { + name: "Top_NoTags_Unsigned", + q: `SELECT top(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 9 * Second, Value: 19}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 31 * Second, Value: 100}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 5 * Second, Value: 10}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 53 * Second, Value: 5}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 53 * Second, Value: 4}}, + }, + }, { name: "Top_Tags_Float", q: `SELECT top(value::float, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`, @@ -578,6 +730,49 @@ func TestSelect(t *testing.T) { }, }, }, + { + name: "Top_Tags_Unsigned", + q: `SELECT top(value::Unsigned, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`, + typ: influxql.Unsigned, + expr: `max(value::Unsigned)`, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, + }}, + }, + points: [][]query.Point{ + { + &query.UnsignedPoint{Name: "cpu", Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Time: 0 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, + &query.StringPoint{Name: "cpu", Time: 5 * Second, Value: "B"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Time: 31 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, + &query.StringPoint{Name: "cpu", Time: 53 * Second, Value: "B"}, + }, + }, + }, { name: "Top_GroupByTags_Float", q: `SELECT top(value::float, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`, @@ -655,6 +850,44 @@ func TestSelect(t *testing.T) { }, }, }, + { + name: "Top_GroupByTags_Unsigned", + q: `SELECT top(value::Unsigned, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, + }}, + }, + points: [][]query.Point{ + { + &query.UnsignedPoint{Name: "cpu", Tags: ParseTags("region=east"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Tags: ParseTags("region=east"), Time: 9 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 0 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 31 * Second, Value: "A"}, + }, + }, + }, { name: "Bottom_NoTags_Float", q: `SELECT bottom(value::float, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`, @@ -719,6 +952,38 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 51 * Second, Value: 2}}, }, }, + { + name: "Bottom_NoTags_Unsigned", + q: `SELECT bottom(value::Unsigned, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 11 * Second, Value: 3}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 31 * Second, Value: 100}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 5 * Second, Value: 10}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 51 * Second, Value: 2}}, + }, + }, { name: "Bottom_Tags_Float", q: `SELECT bottom(value::float, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`, @@ -804,6 +1069,48 @@ func TestSelect(t *testing.T) { }, }, }, + { + name: "Bottom_Tags_Unsigned", + q: `SELECT bottom(value::Unsigned, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, + }}, + }, + points: [][]query.Point{ + { + &query.UnsignedPoint{Name: "cpu", Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, + &query.StringPoint{Name: "cpu", Time: 5 * Second, Value: "B"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Time: 10 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Time: 31 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, + &query.StringPoint{Name: "cpu", Time: 50 * Second, Value: "B"}, + }, + }, + }, { name: "Bottom_GroupByTags_Float", q: `SELECT bottom(value::float, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`, @@ -882,6 +1189,45 @@ func TestSelect(t *testing.T) { }, }, }, + { + name: "Bottom_GroupByTags_Unsigned", + q: `SELECT bottom(value::float, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`, + typ: influxql.Unsigned, + expr: `min(value::float)`, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, + }}, + }, + points: [][]query.Point{ + { + &query.UnsignedPoint{Name: "cpu", Tags: ParseTags("region=east"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Tags: ParseTags("region=east"), Time: 10 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, + &query.StringPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 11 * Second, Value: "A"}, + }, + { + &query.UnsignedPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, + &query.StringPoint{Name: "cpu", Tags: ParseTags("region=west"), Time: 50 * Second, Value: "B"}, + }, + }, + }, { name: "Fill_Null_Float", q: `SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(null)`, @@ -1074,6 +1420,74 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, }, }, + { + name: "Fill_Linear_Unsigned_One", + q: `SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`, + typ: influxql.Unsigned, + expr: `max(value::Unsigned)`, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("host=A"), Time: 32 * Second, Value: 4}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 1, Aggregated: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 2}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + }, + }, + { + name: "Fill_Linear_Unsigned_Many", + q: `SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:20Z' GROUP BY host, time(10s) fill(linear)`, + typ: influxql.Unsigned, + expr: `max(value::Unsigned)`, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("host=A"), Time: 72 * Second, Value: 10}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 1, Aggregated: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 2}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 4}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Value: 5}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Value: 7}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 60 * Second, Value: 8}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 70 * Second, Value: 10, Aggregated: 1}}, + }, + }, + { + name: "Fill_Linear_Unsigned_MultipleSeries", + q: `SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`, + typ: influxql.Unsigned, + expr: `max(value::Unsigned)`, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, + }, + }, { name: "Stddev_Float", q: `SELECT stddev(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -1136,6 +1550,37 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 1.5811388300841898}}, }, }, + { + name: "Stddev_Unsigned", + q: `SELECT stddev(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 0.7071067811865476}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 0.7071067811865476}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&query.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 1.5811388300841898}}, + }, + }, { name: "Spread_Float", q: `SELECT spread(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -1198,6 +1643,37 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 4}}, }, }, + { + name: "Spread_Unsigned", + q: `SELECT spread(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 1}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 0}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 0}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 4}}, + }, + }, { name: "Percentile_Float", q: `SELECT percentile(value, 90) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -1270,6 +1746,42 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 9}}, }, }, + { + name: "Percentile_Unsigned", + q: `SELECT percentile(value, 90) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 9}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 8}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 7}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 54 * Second, Value: 6}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 55 * Second, Value: 5}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 56 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 57 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 58 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 59 * Second, Value: 1}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 3}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 100}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 9}}, + }, + }, { name: "Sample_Float", q: `SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -1312,6 +1824,27 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: 2}}, }, }, + { + name: "Sample_Unsigned", + q: `SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 5 * Second, Value: 10}, + }}, + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 10 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=B"), Time: 15 * Second, Value: 2}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 20}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5 * Second, Value: 10}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Value: 19}}, + {&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 15 * Second, Value: 2}}, + }, + }, { name: "Sample_String", q: `SELECT sample(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, @@ -1467,6 +2000,24 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Time: 12 * Second, Value: -4}}, }, }, + { + name: "Derivative_Unsigned", + q: `SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}}, + {&query.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 2.25}}, + {&query.FloatPoint{Name: "cpu", Time: 12 * Second, Value: -4}}, + }, + }, { name: "Derivative_Desc_Float", q: `SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z' ORDER BY desc`, @@ -1503,6 +2054,24 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 2.5}}, }, }, + { + name: "Derivative_Desc_Unsigned", + q: `SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z' ORDER BY desc`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 12 * Second, Value: 3}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 0 * Second, Value: 20}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 4}}, + {&query.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.25}}, + {&query.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 2.5}}, + }, + }, { name: "Derivative_Duplicate_Float", q: `SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1535,6 +2104,22 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}}, }, }, + { + name: "Derivative_Duplicate_Unsigned", + q: `SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}}, + }, + }, { name: "Difference_Float", q: `SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1571,6 +2156,24 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Time: 12 * Second, Value: -16}}, }, }, + { + name: "Difference_Unsigned", + q: `SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Time: 4 * Second, Value: 18446744073709551606}}, + {&query.UnsignedPoint{Name: "cpu", Time: 8 * Second, Value: 9}}, + {&query.UnsignedPoint{Name: "cpu", Time: 12 * Second, Value: 18446744073709551600}}, + }, + }, { name: "Difference_Duplicate_Float", q: `SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1603,6 +2206,22 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: -10}}, }, }, + { + name: "Difference_Duplicate_Unsigned", + q: `SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Time: 4 * Second, Value: 18446744073709551606}}, + }, + }, { name: "Non_Negative_Difference_Float", q: `SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1637,6 +2256,22 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Time: 8 * Second, Value: 11}}, }, }, + { + name: "Non_Negative_Difference_Unsigned", + q: `SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 21}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Time: 8 * Second, Value: 11}}, + }, + }, { name: "Non_Negative_Difference_Duplicate_Float", q: `SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1683,6 +2318,29 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Time: 16 * Second, Value: 30}}, }, }, + { + name: "Non_Negative_Difference_Duplicate_Unsigned", + q: `SELECT non_negative_difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + {Name: "cpu", Time: 8 * Second, Value: 30}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 12 * Second, Value: 10}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + {Name: "cpu", Time: 16 * Second, Value: 40}, + {Name: "cpu", Time: 16 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Time: 8 * Second, Value: 20}}, + {&query.UnsignedPoint{Name: "cpu", Time: 16 * Second, Value: 30}}, + }, + }, { name: "Elapsed_Float", q: `SELECT elapsed(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1719,6 +2377,24 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Time: 11 * Second, Value: 3}}, }, }, + { + name: "Elapsed_Unsigned", + q: `SELECT elapsed(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 11 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: 4}}, + {&query.IntegerPoint{Name: "cpu", Time: 8 * Second, Value: 4}}, + {&query.IntegerPoint{Name: "cpu", Time: 11 * Second, Value: 3}}, + }, + }, { name: "Elapsed_String", q: `SELECT elapsed(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1853,6 +2529,37 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Time: 0, Value: 125}}, }, }, + { + name: "Integral_Unsigned", + q: `SELECT integral(value) FROM cpu`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 5 * Second, Value: 10}, + {Name: "cpu", Time: 10 * Second, Value: 0}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Time: 0, Value: 100}}, + }, + }, + { + name: "Integral_Duplicate_Unsigned", + q: `SELECT integral(value, 2s) FROM cpu`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 5 * Second, Value: 10}, + {Name: "cpu", Time: 5 * Second, Value: 30}, + {Name: "cpu", Time: 10 * Second, Value: 40}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Time: 0, Value: 125}}, + }, + }, { name: "MovingAverage_Float", q: `SELECT moving_average(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1889,6 +2596,24 @@ func TestSelect(t *testing.T) { {&query.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 11, Aggregated: 2}}, }, }, + { + name: "MovingAverage_Unsigned", + q: `SELECT moving_average(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 15, Aggregated: 2}}, + {&query.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 14.5, Aggregated: 2}}, + {&query.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 11, Aggregated: 2}}, + }, + }, { name: "CumulativeSum_Float", q: `SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1927,6 +2652,25 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Time: 12 * Second, Value: 52}}, }, }, + { + name: "CumulativeSum_Unsigned", + q: `SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&query.UnsignedPoint{Name: "cpu", Time: 4 * Second, Value: 30}}, + {&query.UnsignedPoint{Name: "cpu", Time: 8 * Second, Value: 49}}, + {&query.UnsignedPoint{Name: "cpu", Time: 12 * Second, Value: 52}}, + }, + }, { name: "CumulativeSum_Duplicate_Float", q: `SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, @@ -1965,6 +2709,25 @@ func TestSelect(t *testing.T) { {&query.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: 52}}, }, }, + { + name: "CumulativeSum_Duplicate_Unsigned", + q: `SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`, + typ: influxql.Unsigned, + itrs: []query.Iterator{ + &UnsignedIterator{Points: []query.UnsignedPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, + }, + points: [][]query.Point{ + {&query.UnsignedPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&query.UnsignedPoint{Name: "cpu", Time: 0 * Second, Value: 39}}, + {&query.UnsignedPoint{Name: "cpu", Time: 4 * Second, Value: 49}}, + {&query.UnsignedPoint{Name: "cpu", Time: 4 * Second, Value: 52}}, + }, + }, { name: "HoltWinters_GroupBy_Agg", q: `SELECT holt_winters(mean(value), 2, 2) FROM cpu WHERE time >= '1970-01-01T00:00:10Z' AND time < '1970-01-01T00:00:20Z' GROUP BY time(2s)`, @@ -2044,6 +2807,77 @@ func TestSelect(t *testing.T) { } } +// Ensure a SELECT with raw fields works for all types. +func TestSelect_Raw(t *testing.T) { + shardMapper := ShardMapper{ + MapShardsFn: func(sources influxql.Sources, _ influxql.TimeRange) query.ShardGroup { + return &ShardGroup{ + Fields: map[string]influxql.DataType{ + "f": influxql.Float, + "i": influxql.Integer, + "u": influxql.Unsigned, + "s": influxql.String, + "b": influxql.Boolean, + }, + CreateIteratorFn: func(m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + if !reflect.DeepEqual(opt.Aux, []influxql.VarRef{ + {Val: "b", Type: influxql.Boolean}, + {Val: "f", Type: influxql.Float}, + {Val: "i", Type: influxql.Integer}, + {Val: "s", Type: influxql.String}, + {Val: "u", Type: influxql.Unsigned}, + }) { + t.Fatalf("unexpected auxiliary fields: %v", opt.Aux) + } + return &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Aux: []interface{}{ + true, float64(20), int64(20), "a", uint64(20)}}, + {Name: "cpu", Time: 5 * Second, Aux: []interface{}{ + false, float64(10), int64(10), "b", uint64(10)}}, + {Name: "cpu", Time: 9 * Second, Aux: []interface{}{ + true, float64(19), int64(19), "c", uint64(19)}}, + }}, nil + }, + } + }, + } + + stmt := MustParseSelectStatement(`SELECT f, i, u, s, b FROM cpu`) + itrs, _, err := query.Select(stmt, &shardMapper, query.SelectOptions{}) + if err != nil { + t.Errorf("parse error: %s", err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if diff := cmp.Diff(a, [][]query.Point{ + { + &query.FloatPoint{Name: "cpu", Value: 20, Time: 0 * Second}, + &query.IntegerPoint{Name: "cpu", Value: 20, Time: 0 * Second}, + &query.UnsignedPoint{Name: "cpu", Value: 20, Time: 0 * Second}, + &query.StringPoint{Name: "cpu", Value: "a", Time: 0 * Second}, + &query.BooleanPoint{Name: "cpu", Value: true, Time: 0 * Second}, + }, + { + &query.FloatPoint{Name: "cpu", Value: 10, Time: 5 * Second}, + &query.IntegerPoint{Name: "cpu", Value: 10, Time: 5 * Second}, + &query.UnsignedPoint{Name: "cpu", Value: 10, Time: 5 * Second}, + &query.StringPoint{Name: "cpu", Value: "b", Time: 5 * Second}, + &query.BooleanPoint{Name: "cpu", Value: false, Time: 5 * Second}, + }, + { + &query.FloatPoint{Name: "cpu", Value: 19, Time: 9 * Second}, + &query.IntegerPoint{Name: "cpu", Value: 19, Time: 9 * Second}, + &query.UnsignedPoint{Name: "cpu", Value: 19, Time: 9 * Second}, + &query.StringPoint{Name: "cpu", Value: "c", Time: 9 * Second}, + &query.BooleanPoint{Name: "cpu", Value: true, Time: 9 * Second}, + }, + }); diff != "" { + t.Errorf("unexpected points:\n%s", diff) + } +} + // Ensure a SELECT binary expr queries can be executed as floats. func TestSelect_BinaryExpr_Float(t *testing.T) { shardMapper := ShardMapper{