Add unsigned iterators for all types
This allows unsigned data to be queried from the storage engine. Binary math is not yet implemented for unsigned types.pull/8844/head
parent
940da04a34
commit
0ef94e0cf0
|
|
@ -1239,10 +1239,11 @@ func (s *SelectStatement) RewriteFields(m FieldMapper) (*SelectStatement, error)
|
|||
continue
|
||||
}
|
||||
|
||||
// All types that can expand wildcards support float and integer.
|
||||
// All types that can expand wildcards support float, integer, and unsigned.
|
||||
supportedTypes := map[DataType]struct{}{
|
||||
Float: struct{}{},
|
||||
Integer: struct{}{},
|
||||
Float: {},
|
||||
Integer: {},
|
||||
Unsigned: {},
|
||||
}
|
||||
|
||||
// Add additional types for certain functions.
|
||||
|
|
@ -1252,6 +1253,8 @@ func (s *SelectStatement) RewriteFields(m FieldMapper) (*SelectStatement, error)
|
|||
fallthrough
|
||||
case "min", "max":
|
||||
supportedTypes[Boolean] = struct{}{}
|
||||
case "holt_winters", "holt_winters_with_fit":
|
||||
delete(supportedTypes, Unsigned)
|
||||
}
|
||||
|
||||
for _, ref := range fields {
|
||||
|
|
@ -4103,6 +4106,8 @@ func EvalType(expr Expr, sources Sources, typmap TypeMapper) DataType {
|
|||
return Float
|
||||
case "count":
|
||||
return Integer
|
||||
case "elapsed":
|
||||
return Integer
|
||||
default:
|
||||
return EvalType(expr.Args[0], sources, typmap)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,6 +74,12 @@ func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) {
|
||||
fn := NewUnsignedFuncIntegerReducer(UnsignedCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceIntegerIterator(input, opt, createFn), nil
|
||||
case StringIterator:
|
||||
createFn := func() (StringPointAggregator, IntegerPointEmitter) {
|
||||
fn := NewStringFuncIntegerReducer(StringCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
||||
|
|
@ -107,6 +113,14 @@ func IntegerCountReduce(prev, curr *IntegerPoint) (int64, int64, []interface{})
|
|||
return ZeroTime, prev.Value + 1, nil
|
||||
}
|
||||
|
||||
// UnsignedCountReduce returns the count of points.
|
||||
func UnsignedCountReduce(prev *IntegerPoint, curr *UnsignedPoint) (int64, int64, []interface{}) {
|
||||
if prev == nil {
|
||||
return ZeroTime, 1, nil
|
||||
}
|
||||
return ZeroTime, prev.Value + 1, nil
|
||||
}
|
||||
|
||||
// StringCountReduce returns the count of points.
|
||||
func StringCountReduce(prev *IntegerPoint, curr *StringPoint) (int64, int64, []interface{}) {
|
||||
if prev == nil {
|
||||
|
|
@ -138,6 +152,12 @@ func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedFuncReducer(UnsignedMinReduce, nil)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
case BooleanIterator:
|
||||
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
||||
fn := NewBooleanFuncReducer(BooleanMinReduce, nil)
|
||||
|
|
@ -165,6 +185,14 @@ func IntegerMinReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
|||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// UnsignedMinReduce returns the minimum value between prev & curr.
|
||||
func UnsignedMinReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
||||
if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
||||
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
||||
}
|
||||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// BooleanMinReduce returns the minimum value between prev & curr.
|
||||
func BooleanMinReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
||||
if prev == nil || (curr.Value != prev.Value && !curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
||||
|
|
@ -188,6 +216,12 @@ func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedFuncReducer(UnsignedMaxReduce, nil)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
case BooleanIterator:
|
||||
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
||||
fn := NewBooleanFuncReducer(BooleanMaxReduce, nil)
|
||||
|
|
@ -215,6 +249,14 @@ func IntegerMaxReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
|||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// UnsignedMaxReduce returns the maximum value between prev & curr.
|
||||
func UnsignedMaxReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
||||
if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
||||
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
||||
}
|
||||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// BooleanMaxReduce returns the minimum value between prev & curr.
|
||||
func BooleanMaxReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
||||
if prev == nil || (curr.Value != prev.Value && curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
||||
|
|
@ -238,6 +280,12 @@ func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedFuncReducer(UnsignedSumReduce, &UnsignedPoint{Value: 0, Time: ZeroTime})
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported sum iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -259,6 +307,14 @@ func IntegerSumReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
|||
return prev.Time, prev.Value + curr.Value, nil
|
||||
}
|
||||
|
||||
// UnsignedSumReduce returns the sum prev value & curr value.
|
||||
func UnsignedSumReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
||||
if prev == nil {
|
||||
return ZeroTime, curr.Value, nil
|
||||
}
|
||||
return prev.Time, prev.Value + curr.Value, nil
|
||||
}
|
||||
|
||||
// newFirstIterator returns an iterator for operating on a first() call.
|
||||
func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
|
|
@ -274,6 +330,12 @@ func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedFuncReducer(UnsignedFirstReduce, nil)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
case StringIterator:
|
||||
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
||||
fn := NewStringFuncReducer(StringFirstReduce, nil)
|
||||
|
|
@ -307,6 +369,14 @@ func IntegerFirstReduce(prev, curr *IntegerPoint) (int64, int64, []interface{})
|
|||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// UnsignedFirstReduce returns the first point sorted by time.
|
||||
func UnsignedFirstReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
||||
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
||||
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
||||
}
|
||||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// StringFirstReduce returns the first point sorted by time.
|
||||
func StringFirstReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
|
||||
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
||||
|
|
@ -338,6 +408,12 @@ func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedFuncReducer(UnsignedLastReduce, nil)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
case StringIterator:
|
||||
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
||||
fn := NewStringFuncReducer(StringLastReduce, nil)
|
||||
|
|
@ -371,6 +447,14 @@ func IntegerLastReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
|||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// UnsignedLastReduce returns the last point sorted by time.
|
||||
func UnsignedLastReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
||||
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
||||
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
||||
}
|
||||
return prev.Time, prev.Value, prev.Aux
|
||||
}
|
||||
|
||||
// StringLastReduce returns the first point sorted by time.
|
||||
func StringLastReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
|
||||
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
||||
|
|
@ -402,6 +486,12 @@ func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error)
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedDistinctReducer()
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
case StringIterator:
|
||||
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
||||
fn := NewStringDistinctReducer()
|
||||
|
|
@ -434,6 +524,12 @@ func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceFloatIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
||||
fn := NewUnsignedMeanReducer()
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceFloatIterator(input, opt, createFn), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported mean iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -459,6 +555,12 @@ func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceFloatIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
||||
fn := NewUnsignedSliceFuncFloatReducer(UnsignedMedianReduceSlice)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceFloatIterator(input, opt, createFn), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported median iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -500,6 +602,24 @@ func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint {
|
|||
return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
|
||||
}
|
||||
|
||||
// UnsignedMedianReduceSlice returns the median value within a window.
|
||||
func UnsignedMedianReduceSlice(a []UnsignedPoint) []FloatPoint {
|
||||
if len(a) == 1 {
|
||||
return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}}
|
||||
}
|
||||
|
||||
// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
|
||||
|
||||
// Return the middle value from the points.
|
||||
// If there are an even number of points then return the mean of the two middle points.
|
||||
sort.Sort(unsignedPointsByValue(a))
|
||||
if len(a)%2 == 0 {
|
||||
lo, hi := a[len(a)/2-1], a[(len(a)/2)]
|
||||
return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}}
|
||||
}
|
||||
return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
|
||||
}
|
||||
|
||||
// newModeIterator returns an iterator for operating on a mode() call.
|
||||
func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
|
|
@ -515,6 +635,12 @@ func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedSliceFuncReducer(UnsignedModeReduceSlice)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
case StringIterator:
|
||||
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
||||
fn := NewStringSliceFuncReducer(StringModeReduceSlice)
|
||||
|
|
@ -599,6 +725,39 @@ func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint {
|
|||
return []IntegerPoint{{Time: ZeroTime, Value: mostMode}}
|
||||
}
|
||||
|
||||
// UnsignedModeReduceSlice returns the mode value within a window.
|
||||
func UnsignedModeReduceSlice(a []UnsignedPoint) []UnsignedPoint {
|
||||
if len(a) == 1 {
|
||||
return a
|
||||
}
|
||||
sort.Sort(unsignedPointsByValue(a))
|
||||
|
||||
mostFreq := 0
|
||||
currFreq := 0
|
||||
currMode := a[0].Value
|
||||
mostMode := a[0].Value
|
||||
mostTime := a[0].Time
|
||||
currTime := a[0].Time
|
||||
|
||||
for _, p := range a {
|
||||
if p.Value != currMode {
|
||||
currFreq = 1
|
||||
currMode = p.Value
|
||||
currTime = p.Time
|
||||
continue
|
||||
}
|
||||
currFreq++
|
||||
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
|
||||
continue
|
||||
}
|
||||
mostFreq = currFreq
|
||||
mostMode = p.Value
|
||||
mostTime = p.Time
|
||||
}
|
||||
|
||||
return []UnsignedPoint{{Time: ZeroTime, Value: mostMode}}
|
||||
}
|
||||
|
||||
// StringModeReduceSlice returns the mode value within a window.
|
||||
func StringModeReduceSlice(a []StringPoint) []StringPoint {
|
||||
if len(a) == 1 {
|
||||
|
|
@ -674,6 +833,12 @@ func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceFloatIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
||||
fn := NewUnsignedSliceFuncFloatReducer(UnsignedStddevReduceSlice)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceFloatIterator(input, opt, createFn), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported stddev iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -737,6 +902,32 @@ func IntegerStddevReduceSlice(a []IntegerPoint) []FloatPoint {
|
|||
}}
|
||||
}
|
||||
|
||||
// UnsignedStddevReduceSlice returns the stddev value within a window.
|
||||
func UnsignedStddevReduceSlice(a []UnsignedPoint) []FloatPoint {
|
||||
// If there is only one point then return 0.
|
||||
if len(a) < 2 {
|
||||
return []FloatPoint{{Time: ZeroTime, Nil: true}}
|
||||
}
|
||||
|
||||
// Calculate the mean.
|
||||
var mean float64
|
||||
var count int
|
||||
for _, p := range a {
|
||||
count++
|
||||
mean += (float64(p.Value) - mean) / float64(count)
|
||||
}
|
||||
|
||||
// Calculate the variance.
|
||||
var variance float64
|
||||
for _, p := range a {
|
||||
variance += math.Pow(float64(p.Value)-mean, 2)
|
||||
}
|
||||
return []FloatPoint{{
|
||||
Time: ZeroTime,
|
||||
Value: math.Sqrt(variance / float64(count-1)),
|
||||
}}
|
||||
}
|
||||
|
||||
// newSpreadIterator returns an iterator for operating on a spread() call.
|
||||
func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
|
|
@ -752,6 +943,12 @@ func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedSliceFuncReducer(UnsignedSpreadReduceSlice)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported spread iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -783,6 +980,21 @@ func IntegerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint {
|
|||
return []IntegerPoint{{Time: ZeroTime, Value: max - min}}
|
||||
}
|
||||
|
||||
// UnsignedSpreadReduceSlice returns the spread value within a window.
|
||||
func UnsignedSpreadReduceSlice(a []UnsignedPoint) []UnsignedPoint {
|
||||
// Find min & max values.
|
||||
min, max := a[0].Value, a[0].Value
|
||||
for _, p := range a[1:] {
|
||||
if p.Value < min {
|
||||
min = p.Value
|
||||
}
|
||||
if p.Value > max {
|
||||
max = p.Value
|
||||
}
|
||||
}
|
||||
return []UnsignedPoint{{Time: ZeroTime, Value: max - min}}
|
||||
}
|
||||
|
||||
func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
case FloatIterator:
|
||||
|
|
@ -801,6 +1013,14 @@ func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (
|
|||
itr := newIntegerReduceIntegerIterator(input, opt, createFn)
|
||||
itr.keepTags = keepTags
|
||||
return itr, nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedTopReducer(n)
|
||||
return fn, fn
|
||||
}
|
||||
itr := newUnsignedReduceUnsignedIterator(input, opt, createFn)
|
||||
itr.keepTags = keepTags
|
||||
return itr, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported top iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -824,6 +1044,14 @@ func newBottomIterator(input Iterator, opt IteratorOptions, n int, keepTags bool
|
|||
itr := newIntegerReduceIntegerIterator(input, opt, createFn)
|
||||
itr.keepTags = keepTags
|
||||
return itr, nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedBottomReducer(n)
|
||||
return fn, fn
|
||||
}
|
||||
itr := newUnsignedReduceUnsignedIterator(input, opt, createFn)
|
||||
itr.keepTags = keepTags
|
||||
return itr, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported bottom iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -846,6 +1074,13 @@ func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
unsignedPercentileReduceSlice := NewUnsignedPercentileReduceSliceFunc(percentile)
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedSliceFuncReducer(unsignedPercentileReduceSlice)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported percentile iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -881,6 +1116,21 @@ func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceF
|
|||
}
|
||||
}
|
||||
|
||||
// NewUnsignedPercentileReduceSliceFunc returns the percentile value within a window.
|
||||
func NewUnsignedPercentileReduceSliceFunc(percentile float64) UnsignedReduceSliceFunc {
|
||||
return func(a []UnsignedPoint) []UnsignedPoint {
|
||||
length := len(a)
|
||||
i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
|
||||
|
||||
if i < 0 || i >= length {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Sort(unsignedPointsByValue(a))
|
||||
return []UnsignedPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}}
|
||||
}
|
||||
}
|
||||
|
||||
// newDerivativeIterator returns an iterator for operating on a derivative() call.
|
||||
func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) {
|
||||
switch input := input.(type) {
|
||||
|
|
@ -896,6 +1146,12 @@ func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interva
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerStreamFloatIterator(input, createFn, opt), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
||||
fn := NewUnsignedDerivativeReducer(interval, isNonNegative, opt.Ascending)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedStreamFloatIterator(input, createFn, opt), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported derivative iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -916,6 +1172,12 @@ func newDifferenceIterator(input Iterator, opt IteratorOptions, isNonNegative bo
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedDifferenceReducer(isNonNegative)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported difference iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -936,6 +1198,12 @@ func newElapsedIterator(input Iterator, opt IteratorOptions, interval Interval)
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) {
|
||||
fn := NewUnsignedElapsedReducer(interval)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedStreamIntegerIterator(input, createFn, opt), nil
|
||||
case BooleanIterator:
|
||||
createFn := func() (BooleanPointAggregator, IntegerPointEmitter) {
|
||||
fn := NewBooleanElapsedReducer(interval)
|
||||
|
|
@ -968,6 +1236,12 @@ func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Itera
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerStreamFloatIterator(input, createFn, opt), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
||||
fn := NewUnsignedMovingAverageReducer(n)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedStreamFloatIterator(input, createFn, opt), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported moving average iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -988,6 +1262,12 @@ func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, er
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedCumulativeSumReducer()
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input)
|
||||
}
|
||||
|
|
@ -1033,6 +1313,12 @@ func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator,
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
||||
fn := NewUnsignedSampleReducer(size)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
||||
case StringIterator:
|
||||
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
||||
fn := NewStringSampleReducer(size)
|
||||
|
|
@ -1065,6 +1351,12 @@ func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval)
|
|||
return fn, fn
|
||||
}
|
||||
return newIntegerStreamFloatIterator(input, createFn, opt), nil
|
||||
case UnsignedIterator:
|
||||
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
||||
fn := NewUnsignedIntegralReducer(interval, opt)
|
||||
return fn, fn
|
||||
}
|
||||
return newUnsignedStreamFloatIterator(input, createFn, opt), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported integral iterator type: %T", input)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,6 +79,41 @@ func TestCallIterator_Count_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that an unsigned iterator can be created for a count() call.
|
||||
func TestCallIterator_Count_Unsigned(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
&UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Name: "cpu", Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Name: "cpu", Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Name: "cpu", Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
{Name: "cpu", Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Name: "cpu", Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Name: "cpu", Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Name: "mem", Time: 23, Value: 10, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
query.IteratorOptions{
|
||||
Expr: MustParseExpr(`count("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: query.Interval{Duration: 5 * time.Nanosecond},
|
||||
Ordered: true,
|
||||
Ascending: true,
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, [][]query.Point{
|
||||
{&query.IntegerPoint{Name: "cpu", Time: 0, Value: 3, Tags: ParseTags("host=hostA"), Aggregated: 3}},
|
||||
{&query.IntegerPoint{Name: "cpu", Time: 5, Value: 1, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&query.IntegerPoint{Name: "cpu", Time: 0, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&query.IntegerPoint{Name: "cpu", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&query.IntegerPoint{Name: "mem", Time: 20, Value: 1, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a string iterator can be created for a count() call.
|
||||
func TestCallIterator_Count_String(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
|
|
@ -217,6 +252,40 @@ func TestCallIterator_Min_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that a unsigned iterator can be created for a min() call.
|
||||
func TestCallIterator_Min_Unsigned(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
&UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 4, Value: 12, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
query.IteratorOptions{
|
||||
Expr: MustParseExpr(`min("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: query.Interval{Duration: 5 * time.Nanosecond},
|
||||
Ordered: true,
|
||||
Ascending: true,
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Time: 1, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 4}},
|
||||
{&query.UnsignedPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a boolean iterator can be created for a min() call.
|
||||
func TestCallIterator_Min_Boolean(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
|
|
@ -316,6 +385,39 @@ func TestCallIterator_Max_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that a unsigned iterator can be created for a max() call.
|
||||
func TestCallIterator_Max_Unsigned(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
&UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
query.IteratorOptions{
|
||||
Expr: MustParseExpr(`max("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: query.Interval{Duration: 5 * time.Nanosecond},
|
||||
Ordered: true,
|
||||
Ascending: true,
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}},
|
||||
{&query.UnsignedPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a boolean iterator can be created for a max() call.
|
||||
func TestCallIterator_Max_Boolean(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
|
|
@ -415,6 +517,39 @@ func TestCallIterator_Sum_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that an unsigned iterator can be created for a sum() call.
|
||||
func TestCallIterator_Sum_Unsigned(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
&UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
{Time: 5, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
query.IteratorOptions{
|
||||
Expr: MustParseExpr(`sum("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: query.Interval{Duration: 5 * time.Nanosecond},
|
||||
Ordered: true,
|
||||
Ascending: true,
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Time: 0, Value: 35, Tags: ParseTags("host=hostA"), Aggregated: 3}},
|
||||
{&query.UnsignedPoint{Time: 5, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 0, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a float iterator can be created for a first() call.
|
||||
func TestCallIterator_First_Float(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
|
|
@ -481,6 +616,39 @@ func TestCallIterator_First_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that an unsigned iterator can be created for a first() call.
|
||||
func TestCallIterator_First_Unsigned(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
&UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
{Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
query.IteratorOptions{
|
||||
Expr: MustParseExpr(`first("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: query.Interval{Duration: 5 * time.Nanosecond},
|
||||
Ordered: true,
|
||||
Ascending: true,
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Time: 0, Value: 15, Tags: ParseTags("host=hostA"), Aggregated: 3}},
|
||||
{&query.UnsignedPoint{Time: 6, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a string iterator can be created for a first() call.
|
||||
func TestCallIterator_First_String(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
|
|
@ -613,6 +781,39 @@ func TestCallIterator_Last_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that an unsigned iterator can be created for a last() call.
|
||||
func TestCallIterator_Last_Unsigned(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
&UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
{Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
|
||||
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
query.IteratorOptions{
|
||||
Expr: MustParseExpr(`last("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: query.Interval{Duration: 5 * time.Nanosecond},
|
||||
Ordered: true,
|
||||
Ascending: true,
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Time: 2, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 3}},
|
||||
{&query.UnsignedPoint{Time: 6, Value: 20, Tags: ParseTags("host=hostA"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
{&query.UnsignedPoint{Time: 23, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 1}},
|
||||
}); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a string iterator can be created for a last() call.
|
||||
func TestCallIterator_Last_String(t *testing.T) {
|
||||
itr, _ := query.NewCallIterator(
|
||||
|
|
@ -754,6 +955,43 @@ func TestCallIterator_Mode_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that a unsigned iterator can be created for a mode() call.
|
||||
func TestCallIterator_Mode_Unsigned(t *testing.T) {
|
||||
itr, _ := query.NewModeIterator(&UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")},
|
||||
{Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 3, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 4, Value: 10, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 7, Value: 21, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 8, Value: 21, Tags: ParseTags("region=us-east,host=hostA")},
|
||||
{Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 22, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
{Time: 24, Value: 25, Tags: ParseTags("region=us-west,host=hostB")},
|
||||
}},
|
||||
query.IteratorOptions{
|
||||
Expr: MustParseExpr(`mode("value")`),
|
||||
Dimensions: []string{"host"},
|
||||
Interval: query.Interval{Duration: 5 * time.Nanosecond},
|
||||
Ordered: true,
|
||||
Ascending: true,
|
||||
},
|
||||
)
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA")}},
|
||||
{&query.UnsignedPoint{Time: 5, Value: 21, Tags: ParseTags("host=hostA")}},
|
||||
{&query.UnsignedPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB")}},
|
||||
{&query.UnsignedPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}},
|
||||
}); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a string iterator can be created for a mode() call.
|
||||
func TestCallIterator_Mode_String(t *testing.T) {
|
||||
itr, _ := query.NewModeIterator(&StringIterator{Points: []query.StringPoint{
|
||||
|
|
|
|||
|
|
@ -72,6 +72,37 @@ func (r *IntegerMeanReducer) Emit() []FloatPoint {
|
|||
}}
|
||||
}
|
||||
|
||||
// UnsignedMeanReducer calculates the mean of the aggregated points.
|
||||
type UnsignedMeanReducer struct {
|
||||
sum uint64
|
||||
count uint32
|
||||
}
|
||||
|
||||
// NewUnsignedMeanReducer creates a new UnsignedMeanReducer.
|
||||
func NewUnsignedMeanReducer() *UnsignedMeanReducer {
|
||||
return &UnsignedMeanReducer{}
|
||||
}
|
||||
|
||||
// AggregateUnsigned aggregates a point into the reducer.
|
||||
func (r *UnsignedMeanReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
if p.Aggregated >= 2 {
|
||||
r.sum += p.Value * uint64(p.Aggregated)
|
||||
r.count += p.Aggregated
|
||||
} else {
|
||||
r.sum += p.Value
|
||||
r.count++
|
||||
}
|
||||
}
|
||||
|
||||
// Emit emits the mean of the aggregated points as a single point.
|
||||
func (r *UnsignedMeanReducer) Emit() []FloatPoint {
|
||||
return []FloatPoint{{
|
||||
Time: ZeroTime,
|
||||
Value: float64(r.sum) / float64(r.count),
|
||||
Aggregated: r.count,
|
||||
}}
|
||||
}
|
||||
|
||||
// FloatDerivativeReducer calculates the derivative of the aggregated points.
|
||||
type FloatDerivativeReducer struct {
|
||||
interval Interval
|
||||
|
|
@ -186,6 +217,68 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UnsignedDerivativeReducer calculates the derivative of the aggregated points.
|
||||
type UnsignedDerivativeReducer struct {
|
||||
interval Interval
|
||||
prev UnsignedPoint
|
||||
curr UnsignedPoint
|
||||
isNonNegative bool
|
||||
ascending bool
|
||||
}
|
||||
|
||||
// NewUnsignedDerivativeReducer creates a new UnsignedDerivativeReducer.
|
||||
func NewUnsignedDerivativeReducer(interval Interval, isNonNegative, ascending bool) *UnsignedDerivativeReducer {
|
||||
return &UnsignedDerivativeReducer{
|
||||
interval: interval,
|
||||
isNonNegative: isNonNegative,
|
||||
ascending: ascending,
|
||||
prev: UnsignedPoint{Nil: true},
|
||||
curr: UnsignedPoint{Nil: true},
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateUnsigned aggregates a point into the reducer and updates the current window.
|
||||
func (r *UnsignedDerivativeReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
// Skip past a point when it does not advance the stream. A joined series
|
||||
// may have multiple points at the same time so we will discard anything
|
||||
// except the first point we encounter.
|
||||
if !r.curr.Nil && r.curr.Time == p.Time {
|
||||
return
|
||||
}
|
||||
|
||||
r.prev = r.curr
|
||||
r.curr = *p
|
||||
}
|
||||
|
||||
// Emit emits the derivative of the reducer at the current point.
|
||||
func (r *UnsignedDerivativeReducer) Emit() []FloatPoint {
|
||||
if !r.prev.Nil {
|
||||
// Calculate the derivative of successive points by dividing the
|
||||
// difference of each value by the elapsed time normalized to the interval.
|
||||
var diff float64
|
||||
if r.curr.Value > r.prev.Value {
|
||||
diff = float64(r.curr.Value - r.prev.Value)
|
||||
} else {
|
||||
diff = -float64(r.prev.Value - r.curr.Value)
|
||||
}
|
||||
elapsed := r.curr.Time - r.prev.Time
|
||||
if !r.ascending {
|
||||
elapsed = -elapsed
|
||||
}
|
||||
value := diff / (float64(elapsed) / float64(r.interval.Duration))
|
||||
|
||||
// Mark this point as read by changing the previous point to nil.
|
||||
r.prev.Nil = true
|
||||
|
||||
// Drop negative values for non-negative derivatives.
|
||||
if r.isNonNegative && diff < 0 {
|
||||
return nil
|
||||
}
|
||||
return []FloatPoint{{Time: r.curr.Time, Value: value}}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FloatDifferenceReducer calculates the derivative of the aggregated points.
|
||||
type FloatDifferenceReducer struct {
|
||||
isNonNegative bool
|
||||
|
|
@ -283,6 +376,55 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UnsignedDifferenceReducer calculates the derivative of the aggregated points.
|
||||
type UnsignedDifferenceReducer struct {
|
||||
isNonNegative bool
|
||||
prev UnsignedPoint
|
||||
curr UnsignedPoint
|
||||
}
|
||||
|
||||
// NewUnsignedDifferenceReducer creates a new UnsignedDifferenceReducer.
|
||||
func NewUnsignedDifferenceReducer(isNonNegative bool) *UnsignedDifferenceReducer {
|
||||
return &UnsignedDifferenceReducer{
|
||||
isNonNegative: isNonNegative,
|
||||
prev: UnsignedPoint{Nil: true},
|
||||
curr: UnsignedPoint{Nil: true},
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateUnsigned aggregates a point into the reducer and updates the current window.
|
||||
func (r *UnsignedDifferenceReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
// Skip past a point when it does not advance the stream. A joined series
|
||||
// may have multiple points at the same time so we will discard anything
|
||||
// except the first point we encounter.
|
||||
if !r.curr.Nil && r.curr.Time == p.Time {
|
||||
return
|
||||
}
|
||||
|
||||
r.prev = r.curr
|
||||
r.curr = *p
|
||||
}
|
||||
|
||||
// Emit emits the difference of the reducer at the current point.
|
||||
func (r *UnsignedDifferenceReducer) Emit() []UnsignedPoint {
|
||||
if !r.prev.Nil {
|
||||
// If it is non_negative_difference discard any negative value. Since
|
||||
// prev is still marked as unread. The correctness can be ensured.
|
||||
if r.isNonNegative && r.curr.Value < r.prev.Value {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate the difference of successive points.
|
||||
value := r.curr.Value - r.prev.Value
|
||||
|
||||
// Mark this point as read by changing the previous point to nil.
|
||||
r.prev.Nil = true
|
||||
|
||||
return []UnsignedPoint{{Time: r.curr.Time, Value: value}}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FloatMovingAverageReducer calculates the moving average of the aggregated points.
|
||||
type FloatMovingAverageReducer struct {
|
||||
pos int
|
||||
|
|
@ -377,6 +519,53 @@ func (r *IntegerMovingAverageReducer) Emit() []FloatPoint {
|
|||
}
|
||||
}
|
||||
|
||||
// UnsignedMovingAverageReducer calculates the moving average of the aggregated points.
|
||||
type UnsignedMovingAverageReducer struct {
|
||||
pos int
|
||||
sum uint64
|
||||
time int64
|
||||
buf []uint64
|
||||
}
|
||||
|
||||
// NewUnsignedMovingAverageReducer creates a new UnsignedMovingAverageReducer.
|
||||
func NewUnsignedMovingAverageReducer(n int) *UnsignedMovingAverageReducer {
|
||||
return &UnsignedMovingAverageReducer{
|
||||
buf: make([]uint64, 0, n),
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateUnsigned aggregates a point into the reducer and updates the current window.
|
||||
func (r *UnsignedMovingAverageReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
if len(r.buf) != cap(r.buf) {
|
||||
r.buf = append(r.buf, p.Value)
|
||||
} else {
|
||||
r.sum -= r.buf[r.pos]
|
||||
r.buf[r.pos] = p.Value
|
||||
}
|
||||
r.sum += p.Value
|
||||
r.time = p.Time
|
||||
r.pos++
|
||||
if r.pos >= cap(r.buf) {
|
||||
r.pos = 0
|
||||
}
|
||||
}
|
||||
|
||||
// Emit emits the moving average of the current window. Emit should be called
|
||||
// after every call to AggregateUnsigned and it will produce one point if there
|
||||
// is enough data to fill a window, otherwise it will produce zero points.
|
||||
func (r *UnsignedMovingAverageReducer) Emit() []FloatPoint {
|
||||
if len(r.buf) != cap(r.buf) {
|
||||
return []FloatPoint{}
|
||||
}
|
||||
return []FloatPoint{
|
||||
{
|
||||
Value: float64(r.sum) / float64(len(r.buf)),
|
||||
Time: r.time,
|
||||
Aggregated: uint32(len(r.buf)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// FloatCumulativeSumReducer cumulates the values from each point.
|
||||
type FloatCumulativeSumReducer struct {
|
||||
curr FloatPoint
|
||||
|
|
@ -429,6 +618,32 @@ func (r *IntegerCumulativeSumReducer) Emit() []IntegerPoint {
|
|||
return pts
|
||||
}
|
||||
|
||||
// UnsignedCumulativeSumReducer cumulates the values from each point.
|
||||
type UnsignedCumulativeSumReducer struct {
|
||||
curr UnsignedPoint
|
||||
}
|
||||
|
||||
// NewUnsignedCumulativeSumReducer creates a new UnsignedCumulativeSumReducer.
|
||||
func NewUnsignedCumulativeSumReducer() *UnsignedCumulativeSumReducer {
|
||||
return &UnsignedCumulativeSumReducer{
|
||||
curr: UnsignedPoint{Nil: true},
|
||||
}
|
||||
}
|
||||
|
||||
func (r *UnsignedCumulativeSumReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
r.curr.Value += p.Value
|
||||
r.curr.Time = p.Time
|
||||
r.curr.Nil = false
|
||||
}
|
||||
|
||||
func (r *UnsignedCumulativeSumReducer) Emit() []UnsignedPoint {
|
||||
var pts []UnsignedPoint
|
||||
if !r.curr.Nil {
|
||||
pts = []UnsignedPoint{r.curr}
|
||||
}
|
||||
return pts
|
||||
}
|
||||
|
||||
// FloatHoltWintersReducer forecasts a series into the future.
|
||||
// This is done using the Holt-Winters damped method.
|
||||
// 1. Using the series the initial values are calculated using a SSE.
|
||||
|
|
@ -991,6 +1206,115 @@ func (r *IntegerIntegralReducer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// IntegerIntegralReducer calculates the time-integral of the aggregated points.
|
||||
type UnsignedIntegralReducer struct {
|
||||
interval Interval
|
||||
sum float64
|
||||
prev UnsignedPoint
|
||||
window struct {
|
||||
start int64
|
||||
end int64
|
||||
}
|
||||
ch chan FloatPoint
|
||||
opt IteratorOptions
|
||||
}
|
||||
|
||||
// NewUnsignedIntegralReducer creates a new UnsignedIntegralReducer.
|
||||
func NewUnsignedIntegralReducer(interval Interval, opt IteratorOptions) *UnsignedIntegralReducer {
|
||||
return &UnsignedIntegralReducer{
|
||||
interval: interval,
|
||||
prev: UnsignedPoint{Nil: true},
|
||||
ch: make(chan FloatPoint, 1),
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateUnsigned aggregates a point into the reducer.
|
||||
func (r *UnsignedIntegralReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
// If this is the first point, just save it
|
||||
if r.prev.Nil {
|
||||
r.prev = *p
|
||||
|
||||
// Record the end of the time interval.
|
||||
// We do not care for whether the last number is inclusive or exclusive
|
||||
// because we treat both the same for the involved math.
|
||||
if r.opt.Ascending {
|
||||
r.window.start, r.window.end = r.opt.Window(p.Time)
|
||||
} else {
|
||||
r.window.end, r.window.start = r.opt.Window(p.Time)
|
||||
}
|
||||
|
||||
// If we see the minimum allowable time, set the time to zero so we don't
|
||||
// break the default returned time for aggregate queries without times.
|
||||
if r.window.start == influxql.MinTime {
|
||||
r.window.start = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// If this point has the same timestamp as the previous one,
|
||||
// skip the point. Points sent into this reducer are expected
|
||||
// to be fed in order.
|
||||
value := float64(p.Value)
|
||||
if r.prev.Time == p.Time {
|
||||
r.prev = *p
|
||||
return
|
||||
} else if (r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end) {
|
||||
// If our previous time is not equal to the window, we need to
|
||||
// interpolate the area at the end of this interval.
|
||||
if r.prev.Time != r.window.end {
|
||||
value = linearFloat(r.window.end, r.prev.Time, p.Time, float64(r.prev.Value), value)
|
||||
elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration)
|
||||
r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed
|
||||
|
||||
r.prev.Time = r.window.end
|
||||
}
|
||||
|
||||
// Emit the current point through the channel and then clear it.
|
||||
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
|
||||
if r.opt.Ascending {
|
||||
r.window.start, r.window.end = r.opt.Window(p.Time)
|
||||
} else {
|
||||
r.window.end, r.window.start = r.opt.Window(p.Time)
|
||||
}
|
||||
r.sum = 0.0
|
||||
}
|
||||
|
||||
// Normal operation: update the sum using the trapezium rule
|
||||
elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration)
|
||||
r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed
|
||||
r.prev = *p
|
||||
}
|
||||
|
||||
// Emit emits the time-integral of the aggregated points as a single FLOAT point
|
||||
// InfluxQL convention dictates that outside a group-by-time clause we return
|
||||
// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime
|
||||
// and a higher level will change it to the start of the time group.
|
||||
func (r *UnsignedIntegralReducer) Emit() []FloatPoint {
|
||||
select {
|
||||
case pt, ok := <-r.ch:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return []FloatPoint{pt}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Close flushes any in progress points to ensure any remaining points are
|
||||
// emitted.
|
||||
func (r *UnsignedIntegralReducer) Close() error {
|
||||
// If our last point is at the start time, then discard this point since
|
||||
// there is no area within this bucket. Otherwise, send off what we
|
||||
// currently have as the final point.
|
||||
if !r.prev.Nil && r.prev.Time != r.window.start {
|
||||
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
|
||||
}
|
||||
close(r.ch)
|
||||
return nil
|
||||
}
|
||||
|
||||
type FloatTopReducer struct {
|
||||
h *floatPointsByFunc
|
||||
}
|
||||
|
|
@ -1077,6 +1401,49 @@ func (r *IntegerTopReducer) Emit() []IntegerPoint {
|
|||
return points
|
||||
}
|
||||
|
||||
type UnsignedTopReducer struct {
|
||||
h *unsignedPointsByFunc
|
||||
}
|
||||
|
||||
func NewUnsignedTopReducer(n int) *UnsignedTopReducer {
|
||||
return &UnsignedTopReducer{
|
||||
h: unsignedPointsSortBy(make([]UnsignedPoint, 0, n), func(a, b *UnsignedPoint) bool {
|
||||
if a.Value != b.Value {
|
||||
return a.Value < b.Value
|
||||
}
|
||||
return a.Time > b.Time
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *UnsignedTopReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
if r.h.Len() == cap(r.h.points) {
|
||||
// Compare the minimum point and the aggregated point. If our value is
|
||||
// larger, replace the current min value.
|
||||
if !r.h.cmp(&r.h.points[0], p) {
|
||||
return
|
||||
}
|
||||
r.h.points[0] = *p
|
||||
heap.Fix(r.h, 0)
|
||||
return
|
||||
}
|
||||
heap.Push(r.h, *p)
|
||||
}
|
||||
|
||||
func (r *UnsignedTopReducer) Emit() []UnsignedPoint {
|
||||
// Ensure the points are sorted with the maximum value last. While the
|
||||
// first point may be the minimum value, the rest is not guaranteed to be
|
||||
// in any particular order while it is a heap.
|
||||
points := make([]UnsignedPoint, len(r.h.points))
|
||||
for i, p := range r.h.points {
|
||||
p.Aggregated = 0
|
||||
points[i] = p
|
||||
}
|
||||
h := unsignedPointsByFunc{points: points, cmp: r.h.cmp}
|
||||
sort.Sort(sort.Reverse(&h))
|
||||
return points
|
||||
}
|
||||
|
||||
type FloatBottomReducer struct {
|
||||
h *floatPointsByFunc
|
||||
}
|
||||
|
|
@ -1162,3 +1529,46 @@ func (r *IntegerBottomReducer) Emit() []IntegerPoint {
|
|||
sort.Sort(sort.Reverse(&h))
|
||||
return points
|
||||
}
|
||||
|
||||
type UnsignedBottomReducer struct {
|
||||
h *unsignedPointsByFunc
|
||||
}
|
||||
|
||||
func NewUnsignedBottomReducer(n int) *UnsignedBottomReducer {
|
||||
return &UnsignedBottomReducer{
|
||||
h: unsignedPointsSortBy(make([]UnsignedPoint, 0, n), func(a, b *UnsignedPoint) bool {
|
||||
if a.Value != b.Value {
|
||||
return a.Value > b.Value
|
||||
}
|
||||
return a.Time > b.Time
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *UnsignedBottomReducer) AggregateUnsigned(p *UnsignedPoint) {
|
||||
if r.h.Len() == cap(r.h.points) {
|
||||
// Compare the minimum point and the aggregated point. If our value is
|
||||
// larger, replace the current min value.
|
||||
if !r.h.cmp(&r.h.points[0], p) {
|
||||
return
|
||||
}
|
||||
r.h.points[0] = *p
|
||||
heap.Fix(r.h, 0)
|
||||
return
|
||||
}
|
||||
heap.Push(r.h, *p)
|
||||
}
|
||||
|
||||
func (r *UnsignedBottomReducer) Emit() []UnsignedPoint {
|
||||
// Ensure the points are sorted with the maximum value last. While the
|
||||
// first point may be the minimum value, the rest is not guaranteed to be
|
||||
// in any particular order while it is a heap.
|
||||
points := make([]UnsignedPoint, len(r.h.points))
|
||||
for i, p := range r.h.points {
|
||||
p.Aggregated = 0
|
||||
points[i] = p
|
||||
}
|
||||
h := unsignedPointsByFunc{points: points, cmp: r.h.cmp}
|
||||
sort.Sort(sort.Reverse(&h))
|
||||
return points
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7501,7 +7501,21 @@ func (itr *unsignedFillIterator) Next() (*UnsignedPoint, error) {
|
|||
|
||||
switch itr.opt.Fill {
|
||||
case influxql.LinearFill:
|
||||
fallthrough
|
||||
if !itr.prev.Nil {
|
||||
next, err := itr.input.peek()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
|
||||
interval := int64(itr.opt.Interval.Duration)
|
||||
start := itr.window.time / interval
|
||||
p.Value = linearUnsigned(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
|
||||
} else {
|
||||
p.Nil = true
|
||||
}
|
||||
} else {
|
||||
p.Nil = true
|
||||
}
|
||||
|
||||
case influxql.NullFill:
|
||||
p.Nil = true
|
||||
case influxql.NumberFill:
|
||||
|
|
|
|||
|
|
@ -722,7 +722,7 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
|
|||
|
||||
switch itr.opt.Fill {
|
||||
case influxql.LinearFill:
|
||||
{{- if or (eq $k.Name "Float") (eq $k.Name "Integer")}}
|
||||
{{- if or (eq $k.Name "Float") (eq $k.Name "Integer") (eq $k.Name "Unsigned")}}
|
||||
if !itr.prev.Nil {
|
||||
next, err := itr.input.peek()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -74,6 +74,8 @@ func (a Iterators) dataType() influxql.DataType {
|
|||
return influxql.Float
|
||||
case IntegerIterator:
|
||||
return influxql.Integer
|
||||
case UnsignedIterator:
|
||||
return influxql.Unsigned
|
||||
case StringIterator:
|
||||
return influxql.String
|
||||
case BooleanIterator:
|
||||
|
|
@ -93,6 +95,8 @@ func (a Iterators) coerce() interface{} {
|
|||
return newFloatIterators(a)
|
||||
case influxql.Integer:
|
||||
return newIntegerIterators(a)
|
||||
case influxql.Unsigned:
|
||||
return newUnsignedIterators(a)
|
||||
case influxql.String:
|
||||
return newStringIterators(a)
|
||||
case influxql.Boolean:
|
||||
|
|
@ -161,6 +165,8 @@ func NewMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
|
|||
return newFloatMergeIterator(inputs, opt)
|
||||
case []IntegerIterator:
|
||||
return newIntegerMergeIterator(inputs, opt)
|
||||
case []UnsignedIterator:
|
||||
return newUnsignedMergeIterator(inputs, opt)
|
||||
case []StringIterator:
|
||||
return newStringMergeIterator(inputs, opt)
|
||||
case []BooleanIterator:
|
||||
|
|
@ -223,6 +229,8 @@ func NewSortedMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
|
|||
return newFloatSortedMergeIterator(inputs, opt)
|
||||
case []IntegerIterator:
|
||||
return newIntegerSortedMergeIterator(inputs, opt)
|
||||
case []UnsignedIterator:
|
||||
return newUnsignedSortedMergeIterator(inputs, opt)
|
||||
case []StringIterator:
|
||||
return newStringSortedMergeIterator(inputs, opt)
|
||||
case []BooleanIterator:
|
||||
|
|
@ -243,6 +251,8 @@ func newParallelIterator(input Iterator) Iterator {
|
|||
return newFloatParallelIterator(itr)
|
||||
case IntegerIterator:
|
||||
return newIntegerParallelIterator(itr)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedParallelIterator(itr)
|
||||
case StringIterator:
|
||||
return newStringParallelIterator(itr)
|
||||
case BooleanIterator:
|
||||
|
|
@ -259,6 +269,8 @@ func NewLimitIterator(input Iterator, opt IteratorOptions) Iterator {
|
|||
return newFloatLimitIterator(input, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerLimitIterator(input, opt)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedLimitIterator(input, opt)
|
||||
case StringIterator:
|
||||
return newStringLimitIterator(input, opt)
|
||||
case BooleanIterator:
|
||||
|
|
@ -281,6 +293,8 @@ func NewFilterIterator(input Iterator, cond influxql.Expr, opt IteratorOptions)
|
|||
return newFloatFilterIterator(input, cond, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerFilterIterator(input, cond, opt)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedFilterIterator(input, cond, opt)
|
||||
case StringIterator:
|
||||
return newStringFilterIterator(input, cond, opt)
|
||||
case BooleanIterator:
|
||||
|
|
@ -303,6 +317,8 @@ func NewDedupeIterator(input Iterator) Iterator {
|
|||
return newFloatDedupeIterator(input)
|
||||
case IntegerIterator:
|
||||
return newIntegerDedupeIterator(input)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedDedupeIterator(input)
|
||||
case StringIterator:
|
||||
return newStringDedupeIterator(input)
|
||||
case BooleanIterator:
|
||||
|
|
@ -319,6 +335,8 @@ func NewFillIterator(input Iterator, expr influxql.Expr, opt IteratorOptions) It
|
|||
return newFloatFillIterator(input, expr, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerFillIterator(input, expr, opt)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedFillIterator(input, expr, opt)
|
||||
case StringIterator:
|
||||
return newStringFillIterator(input, expr, opt)
|
||||
case BooleanIterator:
|
||||
|
|
@ -335,6 +353,8 @@ func NewIntervalIterator(input Iterator, opt IteratorOptions) Iterator {
|
|||
return newFloatIntervalIterator(input, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerIntervalIterator(input, opt)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedIntervalIterator(input, opt)
|
||||
case StringIterator:
|
||||
return newStringIntervalIterator(input, opt)
|
||||
case BooleanIterator:
|
||||
|
|
@ -352,6 +372,8 @@ func NewInterruptIterator(input Iterator, closing <-chan struct{}) Iterator {
|
|||
return newFloatInterruptIterator(input, closing)
|
||||
case IntegerIterator:
|
||||
return newIntegerInterruptIterator(input, closing)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedInterruptIterator(input, closing)
|
||||
case StringIterator:
|
||||
return newStringInterruptIterator(input, closing)
|
||||
case BooleanIterator:
|
||||
|
|
@ -369,6 +391,8 @@ func NewCloseInterruptIterator(input Iterator, closing <-chan struct{}) Iterator
|
|||
return newFloatCloseInterruptIterator(input, closing)
|
||||
case IntegerIterator:
|
||||
return newIntegerCloseInterruptIterator(input, closing)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedCloseInterruptIterator(input, closing)
|
||||
case StringIterator:
|
||||
return newStringCloseInterruptIterator(input, closing)
|
||||
case BooleanIterator:
|
||||
|
|
@ -400,6 +424,8 @@ func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator {
|
|||
return newFloatAuxIterator(input, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerAuxIterator(input, opt)
|
||||
case UnsignedIterator:
|
||||
return newUnsignedAuxIterator(input, opt)
|
||||
case StringIterator:
|
||||
return newStringAuxIterator(input, opt)
|
||||
case BooleanIterator:
|
||||
|
|
@ -474,6 +500,10 @@ func (a *auxIteratorFields) iterator(name string, typ influxql.DataType) Iterato
|
|||
itr := &integerChanIterator{cond: sync.NewCond(&sync.Mutex{})}
|
||||
f.append(itr)
|
||||
return itr
|
||||
case influxql.Unsigned:
|
||||
itr := &unsignedChanIterator{cond: sync.NewCond(&sync.Mutex{})}
|
||||
f.append(itr)
|
||||
return itr
|
||||
case influxql.String, influxql.Tag:
|
||||
itr := &stringChanIterator{cond: sync.NewCond(&sync.Mutex{})}
|
||||
f.append(itr)
|
||||
|
|
@ -510,6 +540,8 @@ func (a *auxIteratorFields) send(p Point) (ok bool) {
|
|||
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
|
||||
case *integerChanIterator:
|
||||
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
|
||||
case *unsignedChanIterator:
|
||||
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
|
||||
case *stringChanIterator:
|
||||
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
|
||||
case *booleanChanIterator:
|
||||
|
|
@ -530,6 +562,8 @@ func (a *auxIteratorFields) sendError(err error) {
|
|||
itr.setErr(err)
|
||||
case *integerChanIterator:
|
||||
itr.setErr(err)
|
||||
case *unsignedChanIterator:
|
||||
itr.setErr(err)
|
||||
case *stringChanIterator:
|
||||
itr.setErr(err)
|
||||
case *booleanChanIterator:
|
||||
|
|
@ -551,6 +585,9 @@ func DrainIterator(itr Iterator) {
|
|||
case IntegerIterator:
|
||||
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
||||
}
|
||||
case UnsignedIterator:
|
||||
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
||||
}
|
||||
case StringIterator:
|
||||
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
||||
}
|
||||
|
|
@ -578,6 +615,10 @@ func DrainIterators(itrs []Iterator) {
|
|||
if p, _ := itr.Next(); p != nil {
|
||||
hasData = true
|
||||
}
|
||||
case UnsignedIterator:
|
||||
if p, _ := itr.Next(); p != nil {
|
||||
hasData = true
|
||||
}
|
||||
case StringIterator:
|
||||
if p, _ := itr.Next(); p != nil {
|
||||
hasData = true
|
||||
|
|
@ -605,6 +646,8 @@ func NewReaderIterator(r io.Reader, typ influxql.DataType, stats IteratorStats)
|
|||
return newFloatReaderIterator(r, stats)
|
||||
case influxql.Integer:
|
||||
return newIntegerReaderIterator(r, stats)
|
||||
case influxql.Unsigned:
|
||||
return newUnsignedReaderIterator(r, stats)
|
||||
case influxql.String:
|
||||
return newStringReaderIterator(r, stats)
|
||||
case influxql.Boolean:
|
||||
|
|
|
|||
|
|
@ -114,6 +114,55 @@ func TestMergeIterator_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that a set of iterators can be merged together, sorted by window and name/tag.
|
||||
func TestMergeIterator_Unsigned(t *testing.T) {
|
||||
inputs := []*UnsignedIterator{
|
||||
{Points: []query.UnsignedPoint{
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
|
||||
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
|
||||
}},
|
||||
{Points: []query.UnsignedPoint{
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
|
||||
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
|
||||
}},
|
||||
{Points: []query.UnsignedPoint{}},
|
||||
}
|
||||
itr := query.NewMergeIterator(UnsignedIterators(inputs), query.IteratorOptions{
|
||||
Interval: query.Interval{
|
||||
Duration: 10 * time.Nanosecond,
|
||||
},
|
||||
Dimensions: []string{"host"},
|
||||
Ascending: true,
|
||||
})
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
|
||||
{&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}},
|
||||
{&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}},
|
||||
}) {
|
||||
t.Errorf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
|
||||
for i, input := range inputs {
|
||||
if !input.Closed {
|
||||
t.Errorf("iterator %d not closed", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a set of iterators can be merged together, sorted by window and name/tag.
|
||||
func TestMergeIterator_String(t *testing.T) {
|
||||
inputs := []*StringIterator{
|
||||
|
|
@ -219,6 +268,9 @@ func TestMergeIterator_Nil(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Verifies that coercing will drop values that aren't the primary type.
|
||||
// It's the responsibility of the engine to return the correct type. If they don't,
|
||||
// we drop iterators that don't match.
|
||||
func TestMergeIterator_Coerce_Float(t *testing.T) {
|
||||
inputs := []query.Iterator{
|
||||
&FloatIterator{Points: []query.FloatPoint{
|
||||
|
|
@ -264,6 +316,10 @@ func TestMergeIterator_Coerce_Float(t *testing.T) {
|
|||
if !input.Closed {
|
||||
t.Errorf("iterator %d not closed", i)
|
||||
}
|
||||
case *UnsignedIterator:
|
||||
if !input.Closed {
|
||||
t.Errorf("iterator %d not closed", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -364,6 +420,54 @@ func TestSortedMergeIterator_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that a set of iterators can be merged together, sorted by name/tag.
|
||||
func TestSortedMergeIterator_Unsigned(t *testing.T) {
|
||||
inputs := []*UnsignedIterator{
|
||||
{Points: []query.UnsignedPoint{
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
|
||||
{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8},
|
||||
}},
|
||||
{Points: []query.UnsignedPoint{
|
||||
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
|
||||
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
|
||||
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
|
||||
}},
|
||||
{Points: []query.UnsignedPoint{}},
|
||||
}
|
||||
itr := query.NewSortedMergeIterator(UnsignedIterators(inputs), query.IteratorOptions{
|
||||
Interval: query.Interval{
|
||||
Duration: 10 * time.Nanosecond,
|
||||
},
|
||||
Dimensions: []string{"host"},
|
||||
Ascending: true,
|
||||
})
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
|
||||
{&query.UnsignedPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
|
||||
{&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}},
|
||||
{&query.UnsignedPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}},
|
||||
}) {
|
||||
t.Errorf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
|
||||
for i, input := range inputs {
|
||||
if !input.Closed {
|
||||
t.Errorf("iterator %d not closed", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that a set of iterators can be merged together, sorted by name/tag.
|
||||
func TestSortedMergeIterator_String(t *testing.T) {
|
||||
inputs := []*StringIterator{
|
||||
|
|
@ -512,6 +616,10 @@ func TestSortedMergeIterator_Coerce_Float(t *testing.T) {
|
|||
if !input.Closed {
|
||||
t.Errorf("iterator %d not closed", i)
|
||||
}
|
||||
case *UnsignedIterator:
|
||||
if !input.Closed {
|
||||
t.Errorf("iterator %d not closed", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -574,6 +682,35 @@ func TestLimitIterator_Integer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure limit iterators work with limit and offset.
|
||||
func TestLimitIterator_Unsigned(t *testing.T) {
|
||||
input := &UnsignedIterator{Points: []query.UnsignedPoint{
|
||||
{Name: "cpu", Time: 0, Value: 1},
|
||||
{Name: "cpu", Time: 5, Value: 3},
|
||||
{Name: "cpu", Time: 10, Value: 5},
|
||||
{Name: "mem", Time: 5, Value: 3},
|
||||
{Name: "mem", Time: 7, Value: 8},
|
||||
}}
|
||||
|
||||
itr := query.NewLimitIterator(input, query.IteratorOptions{
|
||||
Limit: 1,
|
||||
Offset: 1,
|
||||
})
|
||||
|
||||
if a, err := Iterators([]query.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]query.Point{
|
||||
{&query.UnsignedPoint{Name: "cpu", Time: 5, Value: 3}},
|
||||
{&query.UnsignedPoint{Name: "mem", Time: 7, Value: 8}},
|
||||
}) {
|
||||
t.Fatalf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
|
||||
if !input.Closed {
|
||||
t.Error("iterator not closed")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure limit iterators work with limit and offset.
|
||||
func TestLimitIterator_String(t *testing.T) {
|
||||
input := &StringIterator{Points: []query.StringPoint{
|
||||
|
|
@ -632,7 +769,7 @@ func TestLimitIterator_Boolean(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure auxilary iterators can be created for auxilary fields.
|
||||
// Ensure auxiliary iterators can be created for auxilary fields.
|
||||
func TestFloatAuxIterator(t *testing.T) {
|
||||
itr := query.NewAuxIterator(
|
||||
&FloatIterator{Points: []query.FloatPoint{
|
||||
|
|
@ -939,6 +1076,12 @@ func (itrs Iterators) Next() ([]query.Point, error) {
|
|||
return nil, err
|
||||
}
|
||||
a[i] = ip
|
||||
case query.UnsignedIterator:
|
||||
up, err := itr.Next()
|
||||
if up == nil || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
a[i] = up
|
||||
case query.StringIterator:
|
||||
sp, err := itr.Next()
|
||||
if sp == nil || err != nil {
|
||||
|
|
@ -1466,6 +1609,35 @@ func IntegerIterators(inputs []*IntegerIterator) []query.Iterator {
|
|||
return itrs
|
||||
}
|
||||
|
||||
// Test implementation of query.UnsignedIterator
|
||||
type UnsignedIterator struct {
|
||||
Points []query.UnsignedPoint
|
||||
Closed bool
|
||||
stats query.IteratorStats
|
||||
}
|
||||
|
||||
func (itr *UnsignedIterator) Stats() query.IteratorStats { return itr.stats }
|
||||
func (itr *UnsignedIterator) Close() error { itr.Closed = true; return nil }
|
||||
|
||||
// Next returns the next value and shifts it off the beginning of the points slice.
|
||||
func (itr *UnsignedIterator) Next() (*query.UnsignedPoint, error) {
|
||||
if len(itr.Points) == 0 || itr.Closed {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
v := &itr.Points[0]
|
||||
itr.Points = itr.Points[1:]
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func UnsignedIterators(inputs []*UnsignedIterator) []query.Iterator {
|
||||
itrs := make([]query.Iterator, len(inputs))
|
||||
for i := range itrs {
|
||||
itrs[i] = query.Iterator(inputs[i])
|
||||
}
|
||||
return itrs
|
||||
}
|
||||
|
||||
// Test implementation of query.StringIterator
|
||||
type StringIterator struct {
|
||||
Points []query.StringPoint
|
||||
|
|
|
|||
|
|
@ -19,3 +19,13 @@ func linearInteger(windowTime, previousTime, nextTime int64, previousValue, next
|
|||
b := float64(previousValue)
|
||||
return int64(m*x + b)
|
||||
}
|
||||
|
||||
// linearInteger computes the the slope of the line between the points (previousTime, previousValue) and (nextTime, nextValue)
|
||||
// and returns the value of the point on the line with time windowTime
|
||||
// y = mx + b
|
||||
func linearUnsigned(windowTime, previousTime, nextTime int64, previousValue, nextValue uint64) uint64 {
|
||||
m := float64(nextValue-previousValue) / float64(nextTime-previousTime) // the slope of the line
|
||||
x := float64(windowTime - previousTime) // how far into the interval we are
|
||||
b := float64(previousValue)
|
||||
return uint64(m*x + b)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,8 @@ func (a Points) Clone() []Point {
|
|||
other[i] = p.Clone()
|
||||
case *IntegerPoint:
|
||||
other[i] = p.Clone()
|
||||
case *UnsignedPoint:
|
||||
other[i] = p.Clone()
|
||||
case *StringPoint:
|
||||
other[i] = p.Clone()
|
||||
case *BooleanPoint:
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue