2017-08-15 19:24:22 +00:00
|
|
|
package query
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"math"
|
|
|
|
"sort"
|
2016-05-12 21:11:19 +00:00
|
|
|
"time"
|
2017-08-15 19:24:22 +00:00
|
|
|
|
|
|
|
"github.com/influxdata/influxdb/influxql"
|
2015-11-04 21:06:06 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
/*
|
|
|
|
This file contains iterator implementations for each function call available
|
|
|
|
in InfluxQL. Call iterators are separated into two groups:
|
|
|
|
|
|
|
|
1. Map/reduce-style iterators - these are passed to IteratorCreator so that
|
|
|
|
processing can be at the low-level storage and aggregates are returned.
|
|
|
|
|
|
|
|
2. Raw aggregate iterators - these require the full set of data for a window.
|
|
|
|
These are handled by the select() function and raw points are streamed in
|
|
|
|
from the low-level storage.
|
|
|
|
|
|
|
|
There are helpers to aid in building aggregate iterators. For simple map/reduce
|
|
|
|
iterators, you can use the reduceIterator types and pass a reduce function. This
|
|
|
|
reduce function is passed a previous and current value and the new timestamp,
|
|
|
|
value, and auxilary fields are returned from it.
|
|
|
|
|
|
|
|
For raw aggregate iterators, you can use the reduceSliceIterators which pass
|
|
|
|
in a slice of all points to the function and return a point. For more complex
|
|
|
|
iterator types, you may need to create your own iterators by hand.
|
|
|
|
|
|
|
|
Once your iterator is complete, you'll need to add it to the NewCallIterator()
|
|
|
|
function if it is to be available to IteratorCreators and add it to the select()
|
|
|
|
function to allow it to be included during planning.
|
|
|
|
*/
|
|
|
|
|
|
|
|
// NewCallIterator returns a new iterator for a Call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func NewCallIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2017-08-15 19:24:22 +00:00
|
|
|
name := opt.Expr.(*influxql.Call).Name
|
2015-11-04 21:06:06 +00:00
|
|
|
switch name {
|
|
|
|
case "count":
|
|
|
|
return newCountIterator(input, opt)
|
|
|
|
case "min":
|
|
|
|
return newMinIterator(input, opt)
|
|
|
|
case "max":
|
|
|
|
return newMaxIterator(input, opt)
|
|
|
|
case "sum":
|
|
|
|
return newSumIterator(input, opt)
|
|
|
|
case "first":
|
|
|
|
return newFirstIterator(input, opt)
|
|
|
|
case "last":
|
|
|
|
return newLastIterator(input, opt)
|
2016-02-11 18:07:45 +00:00
|
|
|
case "mean":
|
|
|
|
return newMeanIterator(input, opt)
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported function call: %s", name)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// newCountIterator returns an iterator for operating on a count() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
// FIXME: Wrap iterator in int-type iterator and always output int value.
|
|
|
|
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-02 23:42:00 +00:00
|
|
|
createFn := func() (FloatPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewFloatFuncIntegerReducer(FloatCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceIntegerIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewIntegerFuncReducer(IntegerCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewUnsignedFuncIntegerReducer(UnsignedCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceIntegerIterator(input, opt, createFn), nil
|
2016-03-02 23:42:00 +00:00
|
|
|
case StringIterator:
|
|
|
|
createFn := func() (StringPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewStringFuncIntegerReducer(StringCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
2016-03-02 23:42:00 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newStringReduceIntegerIterator(input, opt, createFn), nil
|
2016-03-02 23:42:00 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewBooleanFuncIntegerReducer(BooleanCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
2016-03-02 23:42:00 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceIntegerIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported count iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatCountReduce returns the count of points.
|
|
|
|
func FloatCountReduce(prev *IntegerPoint, curr *FloatPoint) (int64, int64, []interface{}) {
|
2015-11-04 21:06:06 +00:00
|
|
|
if prev == nil {
|
2016-03-02 20:52:03 +00:00
|
|
|
return ZeroTime, 1, nil
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2016-03-02 20:52:03 +00:00
|
|
|
return ZeroTime, prev.Value + 1, nil
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerCountReduce returns the count of points.
|
|
|
|
func IntegerCountReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
2016-01-18 22:48:49 +00:00
|
|
|
if prev == nil {
|
2016-03-02 20:52:03 +00:00
|
|
|
return ZeroTime, 1, nil
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
2016-03-02 20:52:03 +00:00
|
|
|
return ZeroTime, prev.Value + 1, nil
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedCountReduce returns the count of points.
|
|
|
|
func UnsignedCountReduce(prev *IntegerPoint, curr *UnsignedPoint) (int64, int64, []interface{}) {
|
|
|
|
if prev == nil {
|
|
|
|
return ZeroTime, 1, nil
|
|
|
|
}
|
|
|
|
return ZeroTime, prev.Value + 1, nil
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// StringCountReduce returns the count of points.
|
|
|
|
func StringCountReduce(prev *IntegerPoint, curr *StringPoint) (int64, int64, []interface{}) {
|
2016-03-02 23:42:00 +00:00
|
|
|
if prev == nil {
|
|
|
|
return ZeroTime, 1, nil
|
|
|
|
}
|
|
|
|
return ZeroTime, prev.Value + 1, nil
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// BooleanCountReduce returns the count of points.
|
|
|
|
func BooleanCountReduce(prev *IntegerPoint, curr *BooleanPoint) (int64, int64, []interface{}) {
|
2016-03-02 23:42:00 +00:00
|
|
|
if prev == nil {
|
|
|
|
return ZeroTime, 1, nil
|
|
|
|
}
|
|
|
|
return ZeroTime, prev.Value + 1, nil
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newMinIterator returns an iterator for operating on a min() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewFloatFuncReducer(FloatMinReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewIntegerFuncReducer(IntegerMinReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedFuncReducer(UnsignedMinReduce, nil)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2016-04-29 18:56:22 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewBooleanFuncReducer(BooleanMinReduce, nil)
|
2016-04-29 18:56:22 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceBooleanIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported min iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatMinReduce returns the minimum value between prev & curr.
|
|
|
|
func FloatMinReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
|
2015-11-04 21:06:06 +00:00
|
|
|
if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerMinReduce returns the minimum value between prev & curr.
|
|
|
|
func IntegerMinReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
2016-01-18 22:48:49 +00:00
|
|
|
if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedMinReduce returns the minimum value between prev & curr.
|
|
|
|
func UnsignedMinReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
|
|
|
if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-04-29 18:56:22 +00:00
|
|
|
// BooleanMinReduce returns the minimum value between prev & curr.
|
|
|
|
func BooleanMinReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
|
|
|
if prev == nil || (curr.Value != prev.Value && !curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-04-29 18:56:22 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newMaxIterator returns an iterator for operating on a max() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewFloatFuncReducer(FloatMaxReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewIntegerFuncReducer(IntegerMaxReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedFuncReducer(UnsignedMaxReduce, nil)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2016-04-29 18:56:22 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewBooleanFuncReducer(BooleanMaxReduce, nil)
|
2016-04-29 18:56:22 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceBooleanIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported max iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatMaxReduce returns the maximum value between prev & curr.
|
|
|
|
func FloatMaxReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
|
2015-11-04 21:06:06 +00:00
|
|
|
if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerMaxReduce returns the maximum value between prev & curr.
|
|
|
|
func IntegerMaxReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
2016-01-18 22:48:49 +00:00
|
|
|
if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedMaxReduce returns the maximum value between prev & curr.
|
|
|
|
func UnsignedMaxReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
|
|
|
if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-04-29 18:56:22 +00:00
|
|
|
// BooleanMaxReduce returns the minimum value between prev & curr.
|
|
|
|
func BooleanMaxReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
|
|
|
if prev == nil || (curr.Value != prev.Value && curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-04-29 18:56:22 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newSumIterator returns an iterator for operating on a sum() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewFloatFuncReducer(FloatSumReduce, &FloatPoint{Value: 0, Time: ZeroTime})
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewIntegerFuncReducer(IntegerSumReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedFuncReducer(UnsignedSumReduce, &UnsignedPoint{Value: 0, Time: ZeroTime})
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported sum iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatSumReduce returns the sum prev value & curr value.
|
|
|
|
func FloatSumReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
|
2015-11-04 21:06:06 +00:00
|
|
|
if prev == nil {
|
2016-03-02 20:52:03 +00:00
|
|
|
return ZeroTime, curr.Value, nil
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2016-01-23 03:04:10 +00:00
|
|
|
return prev.Time, prev.Value + curr.Value, nil
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerSumReduce returns the sum prev value & curr value.
|
|
|
|
func IntegerSumReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
2016-01-18 22:48:49 +00:00
|
|
|
if prev == nil {
|
2016-03-02 20:52:03 +00:00
|
|
|
return ZeroTime, curr.Value, nil
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
2016-01-23 03:04:10 +00:00
|
|
|
return prev.Time, prev.Value + curr.Value, nil
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedSumReduce returns the sum prev value & curr value.
|
|
|
|
func UnsignedSumReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
|
|
|
if prev == nil {
|
|
|
|
return ZeroTime, curr.Value, nil
|
|
|
|
}
|
|
|
|
return prev.Time, prev.Value + curr.Value, nil
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newFirstIterator returns an iterator for operating on a first() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewFloatFuncReducer(FloatFirstReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewIntegerFuncReducer(IntegerFirstReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedFuncReducer(UnsignedFirstReduce, nil)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2016-03-02 23:42:00 +00:00
|
|
|
case StringIterator:
|
|
|
|
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewStringFuncReducer(StringFirstReduce, nil)
|
2016-03-02 23:42:00 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newStringReduceStringIterator(input, opt, createFn), nil
|
2016-03-02 23:42:00 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewBooleanFuncReducer(BooleanFirstReduce, nil)
|
2016-03-02 23:42:00 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceBooleanIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported first iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatFirstReduce returns the first point sorted by time.
|
|
|
|
func FloatFirstReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
|
2015-11-04 21:06:06 +00:00
|
|
|
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2016-01-23 03:04:10 +00:00
|
|
|
return prev.Time, prev.Value, prev.Aux
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerFirstReduce returns the first point sorted by time.
|
|
|
|
func IntegerFirstReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
2016-01-18 22:48:49 +00:00
|
|
|
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
2016-01-23 03:04:10 +00:00
|
|
|
return prev.Time, prev.Value, prev.Aux
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedFirstReduce returns the first point sorted by time.
|
|
|
|
func UnsignedFirstReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
|
|
|
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// StringFirstReduce returns the first point sorted by time.
|
|
|
|
func StringFirstReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
|
2016-03-02 23:42:00 +00:00
|
|
|
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-03-02 23:42:00 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// BooleanFirstReduce returns the first point sorted by time.
|
|
|
|
func BooleanFirstReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
2016-03-02 23:42:00 +00:00
|
|
|
if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && !curr.Value && prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-03-02 23:42:00 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newLastIterator returns an iterator for operating on a last() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewFloatFuncReducer(FloatLastReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewIntegerFuncReducer(IntegerLastReduce, nil)
|
2016-03-02 20:52:03 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedFuncReducer(UnsignedLastReduce, nil)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2016-03-02 23:42:00 +00:00
|
|
|
case StringIterator:
|
|
|
|
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewStringFuncReducer(StringLastReduce, nil)
|
2016-03-02 23:42:00 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newStringReduceStringIterator(input, opt, createFn), nil
|
2016-03-02 23:42:00 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
2016-06-01 21:52:47 +00:00
|
|
|
fn := NewBooleanFuncReducer(BooleanLastReduce, nil)
|
2016-03-02 23:42:00 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceBooleanIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported last iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatLastReduce returns the last point sorted by time.
|
|
|
|
func FloatLastReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
|
2015-11-04 21:06:06 +00:00
|
|
|
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2016-01-23 03:04:10 +00:00
|
|
|
return prev.Time, prev.Value, prev.Aux
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerLastReduce returns the last point sorted by time.
|
|
|
|
func IntegerLastReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
|
2016-01-18 22:48:49 +00:00
|
|
|
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
2016-01-23 03:04:10 +00:00
|
|
|
return prev.Time, prev.Value, prev.Aux
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedLastReduce returns the last point sorted by time.
|
|
|
|
func UnsignedLastReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
|
|
|
|
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// StringLastReduce returns the first point sorted by time.
|
|
|
|
func StringLastReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
|
2016-03-02 23:42:00 +00:00
|
|
|
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-03-02 23:42:00 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// BooleanLastReduce returns the first point sorted by time.
|
|
|
|
func BooleanLastReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
|
2016-03-02 23:42:00 +00:00
|
|
|
if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value && !prev.Value) {
|
2016-11-23 20:32:42 +00:00
|
|
|
return curr.Time, curr.Value, cloneAux(curr.Aux)
|
2016-03-02 23:42:00 +00:00
|
|
|
}
|
|
|
|
return prev.Time, prev.Value, prev.Aux
|
|
|
|
}
|
|
|
|
|
2016-02-04 15:12:52 +00:00
|
|
|
// NewDistinctIterator returns an iterator for operating on a distinct() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-04-07 21:37:48 +00:00
|
|
|
fn := NewFloatDistinctReducer()
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-04-07 21:37:48 +00:00
|
|
|
fn := NewIntegerDistinctReducer()
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedDistinctReducer()
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
case StringIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
2016-04-07 21:37:48 +00:00
|
|
|
fn := NewStringDistinctReducer()
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newStringReduceStringIterator(input, opt, createFn), nil
|
2016-03-17 00:55:50 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
2016-04-07 21:37:48 +00:00
|
|
|
fn := NewBooleanDistinctReducer()
|
2016-03-17 00:55:50 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceBooleanIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported distinct iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// newMeanIterator returns an iterator for operating on a mean() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatMeanReducer()
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-02 20:52:03 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewIntegerMeanReducer()
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceFloatIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewUnsignedMeanReducer()
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceFloatIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported mean iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-08-04 02:32:19 +00:00
|
|
|
// NewMedianIterator returns an iterator for operating on a median() call.
|
|
|
|
func NewMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|
|
|
return newMedianIterator(input, opt)
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newMedianIterator returns an iterator for operating on a median() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-03-07 21:03:14 +00:00
|
|
|
fn := NewFloatSliceFuncReducer(FloatMedianReduceSlice)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
|
2016-03-07 21:03:14 +00:00
|
|
|
fn := NewIntegerSliceFuncFloatReducer(IntegerMedianReduceSlice)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceFloatIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewUnsignedSliceFuncFloatReducer(UnsignedMedianReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceFloatIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported median iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatMedianReduceSlice returns the median value within a window.
|
|
|
|
func FloatMedianReduceSlice(a []FloatPoint) []FloatPoint {
|
2015-11-04 21:06:06 +00:00
|
|
|
if len(a) == 1 {
|
2016-03-07 18:25:45 +00:00
|
|
|
return a
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
|
|
|
|
|
|
|
|
// Return the middle value from the points.
|
|
|
|
// If there are an even number of points then return the mean of the two middle points.
|
|
|
|
sort.Sort(floatPointsByValue(a))
|
|
|
|
if len(a)%2 == 0 {
|
|
|
|
lo, hi := a[len(a)/2-1], a[(len(a)/2)]
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: lo.Value + (hi.Value-lo.Value)/2}}
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: a[len(a)/2].Value}}
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerMedianReduceSlice returns the median value within a window.
|
|
|
|
func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint {
|
2016-01-18 22:48:49 +00:00
|
|
|
if len(a) == 1 {
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}}
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
|
|
|
|
|
|
|
|
// Return the middle value from the points.
|
|
|
|
// If there are an even number of points then return the mean of the two middle points.
|
|
|
|
sort.Sort(integerPointsByValue(a))
|
|
|
|
if len(a)%2 == 0 {
|
|
|
|
lo, hi := a[len(a)/2-1], a[(len(a)/2)]
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}}
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedMedianReduceSlice returns the median value within a window.
|
|
|
|
func UnsignedMedianReduceSlice(a []UnsignedPoint) []FloatPoint {
|
|
|
|
if len(a) == 1 {
|
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}}
|
|
|
|
}
|
|
|
|
|
|
|
|
// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
|
|
|
|
|
|
|
|
// Return the middle value from the points.
|
|
|
|
// If there are an even number of points then return the mean of the two middle points.
|
|
|
|
sort.Sort(unsignedPointsByValue(a))
|
|
|
|
if len(a)%2 == 0 {
|
|
|
|
lo, hi := a[len(a)/2-1], a[(len(a)/2)]
|
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}}
|
|
|
|
}
|
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
|
|
|
|
}
|
|
|
|
|
2016-08-03 10:13:00 +00:00
|
|
|
// newModeIterator returns an iterator for operating on a mode() call.
|
|
|
|
func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatSliceFuncReducer(FloatModeReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-08-03 10:13:00 +00:00
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewIntegerSliceFuncReducer(IntegerModeReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedSliceFuncReducer(UnsignedModeReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2016-08-03 10:13:00 +00:00
|
|
|
case StringIterator:
|
|
|
|
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
|
|
|
fn := NewStringSliceFuncReducer(StringModeReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newStringReduceStringIterator(input, opt, createFn), nil
|
2016-08-03 10:13:00 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
|
|
|
fn := NewBooleanSliceFuncReducer(BooleanModeReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceBooleanIterator(input, opt, createFn), nil
|
2016-08-03 10:13:00 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported median iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// FloatModeReduceSlice returns the mode value within a window.
|
|
|
|
func FloatModeReduceSlice(a []FloatPoint) []FloatPoint {
|
|
|
|
if len(a) == 1 {
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Sort(floatPointsByValue(a))
|
|
|
|
|
|
|
|
mostFreq := 0
|
|
|
|
currFreq := 0
|
|
|
|
currMode := a[0].Value
|
|
|
|
mostMode := a[0].Value
|
|
|
|
mostTime := a[0].Time
|
|
|
|
currTime := a[0].Time
|
|
|
|
|
|
|
|
for _, p := range a {
|
|
|
|
if p.Value != currMode {
|
|
|
|
currFreq = 1
|
|
|
|
currMode = p.Value
|
|
|
|
currTime = p.Time
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
currFreq++
|
|
|
|
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
mostFreq = currFreq
|
|
|
|
mostMode = p.Value
|
|
|
|
mostTime = p.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: mostMode}}
|
|
|
|
}
|
|
|
|
|
|
|
|
// IntegerModeReduceSlice returns the mode value within a window.
|
|
|
|
func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint {
|
|
|
|
if len(a) == 1 {
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
sort.Sort(integerPointsByValue(a))
|
|
|
|
|
|
|
|
mostFreq := 0
|
|
|
|
currFreq := 0
|
|
|
|
currMode := a[0].Value
|
|
|
|
mostMode := a[0].Value
|
|
|
|
mostTime := a[0].Time
|
|
|
|
currTime := a[0].Time
|
|
|
|
|
|
|
|
for _, p := range a {
|
|
|
|
if p.Value != currMode {
|
|
|
|
currFreq = 1
|
|
|
|
currMode = p.Value
|
|
|
|
currTime = p.Time
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
currFreq++
|
|
|
|
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
mostFreq = currFreq
|
|
|
|
mostMode = p.Value
|
|
|
|
mostTime = p.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
return []IntegerPoint{{Time: ZeroTime, Value: mostMode}}
|
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedModeReduceSlice returns the mode value within a window.
|
|
|
|
func UnsignedModeReduceSlice(a []UnsignedPoint) []UnsignedPoint {
|
|
|
|
if len(a) == 1 {
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
sort.Sort(unsignedPointsByValue(a))
|
|
|
|
|
|
|
|
mostFreq := 0
|
|
|
|
currFreq := 0
|
|
|
|
currMode := a[0].Value
|
|
|
|
mostMode := a[0].Value
|
|
|
|
mostTime := a[0].Time
|
|
|
|
currTime := a[0].Time
|
|
|
|
|
|
|
|
for _, p := range a {
|
|
|
|
if p.Value != currMode {
|
|
|
|
currFreq = 1
|
|
|
|
currMode = p.Value
|
|
|
|
currTime = p.Time
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
currFreq++
|
|
|
|
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
mostFreq = currFreq
|
|
|
|
mostMode = p.Value
|
|
|
|
mostTime = p.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
return []UnsignedPoint{{Time: ZeroTime, Value: mostMode}}
|
|
|
|
}
|
|
|
|
|
2016-08-03 10:13:00 +00:00
|
|
|
// StringModeReduceSlice returns the mode value within a window.
|
|
|
|
func StringModeReduceSlice(a []StringPoint) []StringPoint {
|
|
|
|
if len(a) == 1 {
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Sort(stringPointsByValue(a))
|
|
|
|
|
|
|
|
mostFreq := 0
|
|
|
|
currFreq := 0
|
|
|
|
currMode := a[0].Value
|
|
|
|
mostMode := a[0].Value
|
|
|
|
mostTime := a[0].Time
|
|
|
|
currTime := a[0].Time
|
|
|
|
|
|
|
|
for _, p := range a {
|
|
|
|
if p.Value != currMode {
|
|
|
|
currFreq = 1
|
|
|
|
currMode = p.Value
|
|
|
|
currTime = p.Time
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
currFreq++
|
|
|
|
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
mostFreq = currFreq
|
|
|
|
mostMode = p.Value
|
|
|
|
mostTime = p.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
return []StringPoint{{Time: ZeroTime, Value: mostMode}}
|
|
|
|
}
|
|
|
|
|
|
|
|
// BooleanModeReduceSlice returns the mode value within a window.
|
|
|
|
func BooleanModeReduceSlice(a []BooleanPoint) []BooleanPoint {
|
|
|
|
if len(a) == 1 {
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
|
|
|
trueFreq := 0
|
|
|
|
falsFreq := 0
|
|
|
|
mostMode := false
|
|
|
|
|
|
|
|
for _, p := range a {
|
|
|
|
if p.Value {
|
|
|
|
trueFreq++
|
2016-08-24 00:43:18 +00:00
|
|
|
} else {
|
|
|
|
falsFreq++
|
2016-08-03 10:13:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
// In case either of true or false are mode then retuned mode value wont be
|
|
|
|
// of metric with oldest timestamp
|
|
|
|
if trueFreq >= falsFreq {
|
|
|
|
mostMode = true
|
|
|
|
}
|
|
|
|
|
|
|
|
return []BooleanPoint{{Time: ZeroTime, Value: mostMode}}
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newStddevIterator returns an iterator for operating on a stddev() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-03-07 21:03:14 +00:00
|
|
|
fn := NewFloatSliceFuncReducer(FloatStddevReduceSlice)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
|
2016-03-07 21:03:14 +00:00
|
|
|
fn := NewIntegerSliceFuncFloatReducer(IntegerStddevReduceSlice)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceFloatIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewUnsignedSliceFuncFloatReducer(UnsignedStddevReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceFloatIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported stddev iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatStddevReduceSlice returns the stddev value within a window.
|
|
|
|
func FloatStddevReduceSlice(a []FloatPoint) []FloatPoint {
|
2016-02-02 17:19:15 +00:00
|
|
|
// If there is only one point then return 0.
|
2015-11-04 21:06:06 +00:00
|
|
|
if len(a) < 2 {
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Nil: true}}
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the mean.
|
|
|
|
var mean float64
|
|
|
|
var count int
|
|
|
|
for _, p := range a {
|
|
|
|
if math.IsNaN(p.Value) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
count++
|
|
|
|
mean += (p.Value - mean) / float64(count)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the variance.
|
|
|
|
var variance float64
|
|
|
|
for _, p := range a {
|
|
|
|
if math.IsNaN(p.Value) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
variance += math.Pow(p.Value-mean, 2)
|
|
|
|
}
|
|
|
|
return []FloatPoint{{
|
2016-03-07 18:25:45 +00:00
|
|
|
Time: ZeroTime,
|
2015-11-04 21:06:06 +00:00
|
|
|
Value: math.Sqrt(variance / float64(count-1)),
|
|
|
|
}}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerStddevReduceSlice returns the stddev value within a window.
|
|
|
|
func IntegerStddevReduceSlice(a []IntegerPoint) []FloatPoint {
|
2016-02-02 17:19:15 +00:00
|
|
|
// If there is only one point then return 0.
|
2016-01-18 22:48:49 +00:00
|
|
|
if len(a) < 2 {
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Nil: true}}
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the mean.
|
|
|
|
var mean float64
|
|
|
|
var count int
|
|
|
|
for _, p := range a {
|
|
|
|
count++
|
|
|
|
mean += (float64(p.Value) - mean) / float64(count)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the variance.
|
|
|
|
var variance float64
|
|
|
|
for _, p := range a {
|
|
|
|
variance += math.Pow(float64(p.Value)-mean, 2)
|
|
|
|
}
|
|
|
|
return []FloatPoint{{
|
2016-03-07 18:25:45 +00:00
|
|
|
Time: ZeroTime,
|
2016-01-18 22:48:49 +00:00
|
|
|
Value: math.Sqrt(variance / float64(count-1)),
|
|
|
|
}}
|
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedStddevReduceSlice returns the stddev value within a window.
|
|
|
|
func UnsignedStddevReduceSlice(a []UnsignedPoint) []FloatPoint {
|
|
|
|
// If there is only one point then return 0.
|
|
|
|
if len(a) < 2 {
|
|
|
|
return []FloatPoint{{Time: ZeroTime, Nil: true}}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the mean.
|
|
|
|
var mean float64
|
|
|
|
var count int
|
|
|
|
for _, p := range a {
|
|
|
|
count++
|
|
|
|
mean += (float64(p.Value) - mean) / float64(count)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the variance.
|
|
|
|
var variance float64
|
|
|
|
for _, p := range a {
|
|
|
|
variance += math.Pow(float64(p.Value)-mean, 2)
|
|
|
|
}
|
|
|
|
return []FloatPoint{{
|
|
|
|
Time: ZeroTime,
|
|
|
|
Value: math.Sqrt(variance / float64(count-1)),
|
|
|
|
}}
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newSpreadIterator returns an iterator for operating on a spread() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-03-07 21:03:14 +00:00
|
|
|
fn := NewFloatSliceFuncReducer(FloatSpreadReduceSlice)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2016-03-07 21:03:14 +00:00
|
|
|
fn := NewIntegerSliceFuncReducer(IntegerSpreadReduceSlice)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedSliceFuncReducer(UnsignedSpreadReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported spread iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// FloatSpreadReduceSlice returns the spread value within a window.
|
|
|
|
func FloatSpreadReduceSlice(a []FloatPoint) []FloatPoint {
|
2015-11-04 21:06:06 +00:00
|
|
|
// Find min & max values.
|
|
|
|
min, max := a[0].Value, a[0].Value
|
|
|
|
for _, p := range a[1:] {
|
|
|
|
min = math.Min(min, p.Value)
|
|
|
|
max = math.Max(max, p.Value)
|
|
|
|
}
|
2016-03-07 18:25:45 +00:00
|
|
|
return []FloatPoint{{Time: ZeroTime, Value: max - min}}
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// IntegerSpreadReduceSlice returns the spread value within a window.
|
|
|
|
func IntegerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint {
|
2016-01-18 22:48:49 +00:00
|
|
|
// Find min & max values.
|
|
|
|
min, max := a[0].Value, a[0].Value
|
|
|
|
for _, p := range a[1:] {
|
|
|
|
if p.Value < min {
|
|
|
|
min = p.Value
|
|
|
|
}
|
|
|
|
if p.Value > max {
|
|
|
|
max = p.Value
|
|
|
|
}
|
|
|
|
}
|
2016-03-07 18:25:45 +00:00
|
|
|
return []IntegerPoint{{Time: ZeroTime, Value: max - min}}
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// UnsignedSpreadReduceSlice returns the spread value within a window.
|
|
|
|
func UnsignedSpreadReduceSlice(a []UnsignedPoint) []UnsignedPoint {
|
|
|
|
// Find min & max values.
|
|
|
|
min, max := a[0].Value, a[0].Value
|
|
|
|
for _, p := range a[1:] {
|
|
|
|
if p.Value < min {
|
|
|
|
min = p.Value
|
|
|
|
}
|
|
|
|
if p.Value > max {
|
|
|
|
max = p.Value
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return []UnsignedPoint{{Time: ZeroTime, Value: max - min}}
|
|
|
|
}
|
|
|
|
|
2017-05-17 16:10:51 +00:00
|
|
|
func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) {
|
2016-01-08 18:02:14 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
Optimize top() and bottom() using an incremental aggregator
The previous version of `top()` and `bottom()` would gather all of the
points to use in a slice, filter them (if necessary), then use a
slightly modified heap sort to retrieve the top or bottom values.
This performed horrendously from the standpoint of memory. Since it
consumed so much memory and spent so much time in allocations (along
with sorting a potentially very large slice), this affected speed too.
These calls have now been modified so they keep the top or bottom points
in a min or max heap. For `top()`, a new point will read the minimum
value from the heap. If the new point is greater than the minimum point,
it will replace the minimum point and fix the heap with the new value.
If the new point is smaller, it discards that point. For `bottom()`, the
process is the opposite.
It will then sort the final result to ensure the correct ordering of the
selected points.
When `top()` or `bottom()` contain a tag to select, they have now been
modified so this query:
SELECT top(value, host, 2) FROM cpu
Essentially becomes this query:
SELECT top(value, 2), host FROM (
SELECT max(value) FROM cpu GROUP BY host
)
This should drastically increase the performance of all `top()` and
`bottom()` queries.
2017-05-16 17:37:39 +00:00
|
|
|
fn := NewFloatTopReducer(n)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2017-05-17 16:10:51 +00:00
|
|
|
itr := newFloatReduceFloatIterator(input, opt, createFn)
|
|
|
|
itr.keepTags = keepTags
|
|
|
|
return itr, nil
|
2016-02-02 20:23:06 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
Optimize top() and bottom() using an incremental aggregator
The previous version of `top()` and `bottom()` would gather all of the
points to use in a slice, filter them (if necessary), then use a
slightly modified heap sort to retrieve the top or bottom values.
This performed horrendously from the standpoint of memory. Since it
consumed so much memory and spent so much time in allocations (along
with sorting a potentially very large slice), this affected speed too.
These calls have now been modified so they keep the top or bottom points
in a min or max heap. For `top()`, a new point will read the minimum
value from the heap. If the new point is greater than the minimum point,
it will replace the minimum point and fix the heap with the new value.
If the new point is smaller, it discards that point. For `bottom()`, the
process is the opposite.
It will then sort the final result to ensure the correct ordering of the
selected points.
When `top()` or `bottom()` contain a tag to select, they have now been
modified so this query:
SELECT top(value, host, 2) FROM cpu
Essentially becomes this query:
SELECT top(value, 2), host FROM (
SELECT max(value) FROM cpu GROUP BY host
)
This should drastically increase the performance of all `top()` and
`bottom()` queries.
2017-05-16 17:37:39 +00:00
|
|
|
fn := NewIntegerTopReducer(n)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2017-05-17 16:10:51 +00:00
|
|
|
itr := newIntegerReduceIntegerIterator(input, opt, createFn)
|
|
|
|
itr.keepTags = keepTags
|
|
|
|
return itr, nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedTopReducer(n)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
itr := newUnsignedReduceUnsignedIterator(input, opt, createFn)
|
|
|
|
itr.keepTags = keepTags
|
|
|
|
return itr, nil
|
2016-01-08 18:02:14 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported top iterator type: %T", input)
|
2016-01-08 18:02:14 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-17 16:10:51 +00:00
|
|
|
func newBottomIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) {
|
2016-01-08 18:02:14 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
Optimize top() and bottom() using an incremental aggregator
The previous version of `top()` and `bottom()` would gather all of the
points to use in a slice, filter them (if necessary), then use a
slightly modified heap sort to retrieve the top or bottom values.
This performed horrendously from the standpoint of memory. Since it
consumed so much memory and spent so much time in allocations (along
with sorting a potentially very large slice), this affected speed too.
These calls have now been modified so they keep the top or bottom points
in a min or max heap. For `top()`, a new point will read the minimum
value from the heap. If the new point is greater than the minimum point,
it will replace the minimum point and fix the heap with the new value.
If the new point is smaller, it discards that point. For `bottom()`, the
process is the opposite.
It will then sort the final result to ensure the correct ordering of the
selected points.
When `top()` or `bottom()` contain a tag to select, they have now been
modified so this query:
SELECT top(value, host, 2) FROM cpu
Essentially becomes this query:
SELECT top(value, 2), host FROM (
SELECT max(value) FROM cpu GROUP BY host
)
This should drastically increase the performance of all `top()` and
`bottom()` queries.
2017-05-16 17:37:39 +00:00
|
|
|
fn := NewFloatBottomReducer(n)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2017-05-17 16:10:51 +00:00
|
|
|
itr := newFloatReduceFloatIterator(input, opt, createFn)
|
|
|
|
itr.keepTags = keepTags
|
|
|
|
return itr, nil
|
2016-02-02 20:23:06 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
Optimize top() and bottom() using an incremental aggregator
The previous version of `top()` and `bottom()` would gather all of the
points to use in a slice, filter them (if necessary), then use a
slightly modified heap sort to retrieve the top or bottom values.
This performed horrendously from the standpoint of memory. Since it
consumed so much memory and spent so much time in allocations (along
with sorting a potentially very large slice), this affected speed too.
These calls have now been modified so they keep the top or bottom points
in a min or max heap. For `top()`, a new point will read the minimum
value from the heap. If the new point is greater than the minimum point,
it will replace the minimum point and fix the heap with the new value.
If the new point is smaller, it discards that point. For `bottom()`, the
process is the opposite.
It will then sort the final result to ensure the correct ordering of the
selected points.
When `top()` or `bottom()` contain a tag to select, they have now been
modified so this query:
SELECT top(value, host, 2) FROM cpu
Essentially becomes this query:
SELECT top(value, 2), host FROM (
SELECT max(value) FROM cpu GROUP BY host
)
This should drastically increase the performance of all `top()` and
`bottom()` queries.
2017-05-16 17:37:39 +00:00
|
|
|
fn := NewIntegerBottomReducer(n)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2017-05-17 16:10:51 +00:00
|
|
|
itr := newIntegerReduceIntegerIterator(input, opt, createFn)
|
|
|
|
itr.keepTags = keepTags
|
|
|
|
return itr, nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedBottomReducer(n)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
itr := newUnsignedReduceUnsignedIterator(input, opt, createFn)
|
|
|
|
itr.keepTags = keepTags
|
|
|
|
return itr, nil
|
2016-01-08 18:02:14 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported bottom iterator type: %T", input)
|
2016-01-08 18:02:14 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// newPercentileIterator returns an iterator for operating on a percentile() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 21:03:14 +00:00
|
|
|
floatPercentileReduceSlice := NewFloatPercentileReduceSliceFunc(percentile)
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatSliceFuncReducer(floatPercentileReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 21:03:14 +00:00
|
|
|
integerPercentileReduceSlice := NewIntegerPercentileReduceSliceFunc(percentile)
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewIntegerSliceFuncReducer(integerPercentileReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
unsignedPercentileReduceSlice := NewUnsignedPercentileReduceSliceFunc(percentile)
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedSliceFuncReducer(unsignedPercentileReduceSlice)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported percentile iterator type: %T", input)
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// NewFloatPercentileReduceSliceFunc returns the percentile value within a window.
|
|
|
|
func NewFloatPercentileReduceSliceFunc(percentile float64) FloatReduceSliceFunc {
|
2016-03-07 18:25:45 +00:00
|
|
|
return func(a []FloatPoint) []FloatPoint {
|
2016-02-11 18:05:34 +00:00
|
|
|
length := len(a)
|
|
|
|
i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2016-02-11 18:05:34 +00:00
|
|
|
if i < 0 || i >= length {
|
2016-03-07 18:25:45 +00:00
|
|
|
return nil
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2016-02-11 18:05:34 +00:00
|
|
|
sort.Sort(floatPointsByValue(a))
|
2016-11-23 20:32:42 +00:00
|
|
|
return []FloatPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}}
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
2015-12-24 18:46:31 +00:00
|
|
|
|
2016-03-07 21:03:14 +00:00
|
|
|
// NewIntegerPercentileReduceSliceFunc returns the percentile value within a window.
|
|
|
|
func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceFunc {
|
2016-03-07 18:25:45 +00:00
|
|
|
return func(a []IntegerPoint) []IntegerPoint {
|
2016-02-11 18:05:34 +00:00
|
|
|
length := len(a)
|
|
|
|
i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
|
2016-01-18 22:48:49 +00:00
|
|
|
|
2016-02-11 18:05:34 +00:00
|
|
|
if i < 0 || i >= length {
|
2016-01-18 22:48:49 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-02-11 18:05:34 +00:00
|
|
|
sort.Sort(integerPointsByValue(a))
|
2016-11-23 20:32:42 +00:00
|
|
|
return []IntegerPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}}
|
2016-01-18 22:48:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-09-18 16:28:37 +00:00
|
|
|
// NewUnsignedPercentileReduceSliceFunc returns the percentile value within a window.
|
|
|
|
func NewUnsignedPercentileReduceSliceFunc(percentile float64) UnsignedReduceSliceFunc {
|
|
|
|
return func(a []UnsignedPoint) []UnsignedPoint {
|
|
|
|
length := len(a)
|
|
|
|
i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
|
|
|
|
|
|
|
|
if i < 0 || i >= length {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Sort(unsignedPointsByValue(a))
|
|
|
|
return []UnsignedPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-12-24 18:46:31 +00:00
|
|
|
// newDerivativeIterator returns an iterator for operating on a derivative() call.
|
2016-02-22 17:51:45 +00:00
|
|
|
func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) {
|
2015-12-24 18:46:31 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2016-02-19 21:05:36 +00:00
|
|
|
fn := NewFloatDerivativeReducer(interval, isNonNegative, opt.Ascending)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-02-19 21:05:36 +00:00
|
|
|
return newFloatStreamFloatIterator(input, createFn, opt), nil
|
2016-02-12 16:23:52 +00:00
|
|
|
case IntegerIterator:
|
2016-03-07 18:25:45 +00:00
|
|
|
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
|
2016-02-19 21:05:36 +00:00
|
|
|
fn := NewIntegerDerivativeReducer(interval, isNonNegative, opt.Ascending)
|
2016-03-07 18:25:45 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-02-19 21:05:36 +00:00
|
|
|
return newIntegerStreamFloatIterator(input, createFn, opt), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewUnsignedDerivativeReducer(interval, isNonNegative, opt.Ascending)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedStreamFloatIterator(input, createFn, opt), nil
|
2015-12-24 18:46:31 +00:00
|
|
|
default:
|
2016-02-22 17:51:45 +00:00
|
|
|
return nil, fmt.Errorf("unsupported derivative iterator type: %T", input)
|
2015-12-24 18:46:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-23 19:50:26 +00:00
|
|
|
// newDifferenceIterator returns an iterator for operating on a difference() call.
|
2017-03-31 02:26:14 +00:00
|
|
|
func newDifferenceIterator(input Iterator, opt IteratorOptions, isNonNegative bool) (Iterator, error) {
|
2016-03-23 19:50:26 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2017-03-31 02:26:14 +00:00
|
|
|
fn := NewFloatDifferenceReducer(isNonNegative)
|
2016-03-23 19:50:26 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-02-19 21:05:36 +00:00
|
|
|
return newFloatStreamFloatIterator(input, createFn, opt), nil
|
2016-03-23 19:50:26 +00:00
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
2017-03-31 02:26:14 +00:00
|
|
|
fn := NewIntegerDifferenceReducer(isNonNegative)
|
2016-03-23 19:50:26 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2016-02-19 21:05:36 +00:00
|
|
|
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedDifferenceReducer(isNonNegative)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil
|
2016-03-23 19:50:26 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported difference iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-04-19 16:36:41 +00:00
|
|
|
// newElapsedIterator returns an iterator for operating on a elapsed() call.
|
|
|
|
func newElapsedIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewFloatElapsedReducer(interval)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newFloatStreamIntegerIterator(input, createFn, opt), nil
|
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewIntegerElapsedReducer(interval)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewUnsignedElapsedReducer(interval)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedStreamIntegerIterator(input, createFn, opt), nil
|
2016-04-19 16:36:41 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewBooleanElapsedReducer(interval)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newBooleanStreamIntegerIterator(input, createFn, opt), nil
|
|
|
|
case StringIterator:
|
|
|
|
createFn := func() (StringPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewStringElapsedReducer(interval)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newStringStreamIntegerIterator(input, createFn, opt), nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-24 14:40:55 +00:00
|
|
|
// newMovingAverageIterator returns an iterator for operating on a moving_average() call.
|
|
|
|
func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Iterator, error) {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatMovingAverageReducer(n)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newFloatStreamFloatIterator(input, createFn, opt), nil
|
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewIntegerMovingAverageReducer(n)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newIntegerStreamFloatIterator(input, createFn, opt), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewUnsignedMovingAverageReducer(n)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedStreamFloatIterator(input, createFn, opt), nil
|
2016-03-24 14:40:55 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported moving average iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|
2016-05-12 21:11:19 +00:00
|
|
|
|
2016-10-07 15:11:50 +00:00
|
|
|
// newCumulativeSumIterator returns an iterator for operating on a cumulative_sum() call.
|
|
|
|
func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatCumulativeSumReducer()
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newFloatStreamFloatIterator(input, createFn, opt), nil
|
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewIntegerCumulativeSumReducer()
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedCumulativeSumReducer()
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil
|
2016-10-07 15:11:50 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-30 01:39:24 +00:00
|
|
|
// newHoltWintersIterator returns an iterator for operating on a holt_winters() call.
|
2016-05-12 21:11:19 +00:00
|
|
|
func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, includeFitData bool, interval time.Duration) (Iterator, error) {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatHoltWintersReducer(h, m, includeFitData, interval)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-05-12 21:11:19 +00:00
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatHoltWintersReducer(h, m, includeFitData, interval)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceFloatIterator(input, opt, createFn), nil
|
2016-05-12 21:11:19 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|
2016-10-04 21:20:35 +00:00
|
|
|
|
2016-12-30 01:39:24 +00:00
|
|
|
// NewSampleIterator returns an iterator for operating on a sample() call (exported for use in test).
|
2016-10-04 21:20:35 +00:00
|
|
|
func NewSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
|
|
|
|
return newSampleIterator(input, opt, size)
|
|
|
|
}
|
|
|
|
|
2016-12-30 01:39:24 +00:00
|
|
|
// newSampleIterator returns an iterator for operating on a sample() call.
|
2016-10-04 21:20:35 +00:00
|
|
|
func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewFloatSampleReducer(size)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newFloatReduceFloatIterator(input, opt, createFn), nil
|
2016-10-04 21:20:35 +00:00
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
|
|
|
|
fn := NewIntegerSampleReducer(size)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
|
|
|
|
fn := NewUnsignedSampleReducer(size)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
|
2016-10-04 21:20:35 +00:00
|
|
|
case StringIterator:
|
|
|
|
createFn := func() (StringPointAggregator, StringPointEmitter) {
|
|
|
|
fn := NewStringSampleReducer(size)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newStringReduceStringIterator(input, opt, createFn), nil
|
2016-11-15 21:56:44 +00:00
|
|
|
case BooleanIterator:
|
|
|
|
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
|
|
|
|
fn := NewBooleanSampleReducer(size)
|
|
|
|
return fn, fn
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return newBooleanReduceBooleanIterator(input, opt, createFn), nil
|
2016-10-04 21:20:35 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|
2016-11-06 12:54:26 +00:00
|
|
|
|
|
|
|
// newIntegralIterator returns an iterator for operating on a integral() call.
|
|
|
|
func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
|
2017-03-23 20:18:03 +00:00
|
|
|
fn := NewFloatIntegralReducer(interval, opt)
|
2016-11-06 12:54:26 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2017-03-23 20:18:03 +00:00
|
|
|
return newFloatStreamFloatIterator(input, createFn, opt), nil
|
2016-11-06 12:54:26 +00:00
|
|
|
case IntegerIterator:
|
|
|
|
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
|
2017-03-23 20:18:03 +00:00
|
|
|
fn := NewIntegerIntegralReducer(interval, opt)
|
2016-11-06 12:54:26 +00:00
|
|
|
return fn, fn
|
|
|
|
}
|
2017-03-23 20:18:03 +00:00
|
|
|
return newIntegerStreamFloatIterator(input, createFn, opt), nil
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
|
|
|
|
fn := NewUnsignedIntegralReducer(interval, opt)
|
|
|
|
return fn, fn
|
|
|
|
}
|
|
|
|
return newUnsignedStreamFloatIterator(input, createFn, opt), nil
|
2016-11-06 12:54:26 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported integral iterator type: %T", input)
|
|
|
|
}
|
|
|
|
}
|