Merge pull request #5916 from influxdata/js-refactor-reduce-slice-func

Refactor reduce slice func to use the aggregator and emitter
pull/5929/head
Jonathan A. Sternberg 2016-03-07 15:49:37 -05:00
commit 370f8e4176
7 changed files with 1083 additions and 1003 deletions

View File

@ -362,18 +362,30 @@ func booleanLastReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatDistinctReduceSlice}, nil
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(floatDistinctReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerDistinctReduceSlice}, nil
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(integerDistinctReduceSlice)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringDistinctReduceSlice}, nil
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringSliceFuncReducer(stringDistinctReduceSlice)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported distinct iterator type: %T", input)
}
}
// floatDistinctReduceSlice returns the distinct value within a window.
func floatDistinctReduceSlice(a []FloatPoint, opt *reduceOptions) []FloatPoint {
func floatDistinctReduceSlice(a []FloatPoint) []FloatPoint {
m := make(map[float64]FloatPoint)
for _, p := range a {
if _, ok := m[p.Value]; !ok {
@ -390,7 +402,7 @@ func floatDistinctReduceSlice(a []FloatPoint, opt *reduceOptions) []FloatPoint {
}
// integerDistinctReduceSlice returns the distinct value within a window.
func integerDistinctReduceSlice(a []IntegerPoint, opt *reduceOptions) []IntegerPoint {
func integerDistinctReduceSlice(a []IntegerPoint) []IntegerPoint {
m := make(map[int64]IntegerPoint)
for _, p := range a {
if _, ok := m[p.Value]; !ok {
@ -407,7 +419,7 @@ func integerDistinctReduceSlice(a []IntegerPoint, opt *reduceOptions) []IntegerP
}
// stringDistinctReduceSlice returns the distinct value within a window.
func stringDistinctReduceSlice(a []StringPoint, opt *reduceOptions) []StringPoint {
func stringDistinctReduceSlice(a []StringPoint) []StringPoint {
m := make(map[string]StringPoint)
for _, p := range a {
if _, ok := m[p.Value]; !ok {
@ -447,18 +459,26 @@ func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatMedianReduceSlice}, nil
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(floatMedianReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerMedianReduceSlice}, nil
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerSliceFuncFloatReducer(integerMedianReduceSlice)
return fn, fn
}
return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported median iterator type: %T", input)
}
}
// floatMedianReduceSlice returns the median value within a window.
func floatMedianReduceSlice(a []FloatPoint, opt *reduceOptions) []FloatPoint {
func floatMedianReduceSlice(a []FloatPoint) []FloatPoint {
if len(a) == 1 {
return []FloatPoint{{Time: opt.startTime, Value: a[0].Value}}
return a
}
// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
@ -468,15 +488,15 @@ func floatMedianReduceSlice(a []FloatPoint, opt *reduceOptions) []FloatPoint {
sort.Sort(floatPointsByValue(a))
if len(a)%2 == 0 {
lo, hi := a[len(a)/2-1], a[(len(a)/2)]
return []FloatPoint{{Time: opt.startTime, Value: lo.Value + (hi.Value-lo.Value)/2}}
return []FloatPoint{{Time: ZeroTime, Value: lo.Value + (hi.Value-lo.Value)/2}}
}
return []FloatPoint{{Time: opt.startTime, Value: a[len(a)/2].Value}}
return []FloatPoint{{Time: ZeroTime, Value: a[len(a)/2].Value}}
}
// integerMedianReduceSlice returns the median value within a window.
func integerMedianReduceSlice(a []IntegerPoint, opt *reduceOptions) []FloatPoint {
func integerMedianReduceSlice(a []IntegerPoint) []FloatPoint {
if len(a) == 1 {
return []FloatPoint{{Time: opt.startTime, Value: float64(a[0].Value)}}
return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}}
}
// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
@ -486,30 +506,42 @@ func integerMedianReduceSlice(a []IntegerPoint, opt *reduceOptions) []FloatPoint
sort.Sort(integerPointsByValue(a))
if len(a)%2 == 0 {
lo, hi := a[len(a)/2-1], a[(len(a)/2)]
return []FloatPoint{{Time: opt.startTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}}
return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}}
}
return []FloatPoint{{Time: opt.startTime, Value: float64(a[len(a)/2].Value)}}
return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
}
// newStddevIterator returns an iterator for operating on a stddev() call.
func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatStddevReduceSlice}, nil
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(floatStddevReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerStddevReduceSlice}, nil
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerSliceFuncFloatReducer(integerStddevReduceSlice)
return fn, fn
}
return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
return &stringReduceSliceIterator{input: newBufStringIterator(input), opt: opt, fn: stringStddevReduceSlice}, nil
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringSliceFuncReducer(stringStddevReduceSlice)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported stddev iterator type: %T", input)
}
}
// floatStddevReduceSlice returns the stddev value within a window.
func floatStddevReduceSlice(a []FloatPoint, opt *reduceOptions) []FloatPoint {
func floatStddevReduceSlice(a []FloatPoint) []FloatPoint {
// If there is only one point then return 0.
if len(a) < 2 {
return []FloatPoint{{Time: opt.startTime, Nil: true}}
return []FloatPoint{{Time: ZeroTime, Nil: true}}
}
// Calculate the mean.
@ -532,16 +564,16 @@ func floatStddevReduceSlice(a []FloatPoint, opt *reduceOptions) []FloatPoint {
variance += math.Pow(p.Value-mean, 2)
}
return []FloatPoint{{
Time: opt.startTime,
Time: ZeroTime,
Value: math.Sqrt(variance / float64(count-1)),
}}
}
// integerStddevReduceSlice returns the stddev value within a window.
func integerStddevReduceSlice(a []IntegerPoint, opt *reduceOptions) []FloatPoint {
func integerStddevReduceSlice(a []IntegerPoint) []FloatPoint {
// If there is only one point then return 0.
if len(a) < 2 {
return []FloatPoint{{Time: opt.startTime, Nil: true}}
return []FloatPoint{{Time: ZeroTime, Nil: true}}
}
// Calculate the mean.
@ -558,41 +590,49 @@ func integerStddevReduceSlice(a []IntegerPoint, opt *reduceOptions) []FloatPoint
variance += math.Pow(float64(p.Value)-mean, 2)
}
return []FloatPoint{{
Time: opt.startTime,
Time: ZeroTime,
Value: math.Sqrt(variance / float64(count-1)),
}}
}
// stringStddevReduceSlice always returns "".
func stringStddevReduceSlice(a []StringPoint, opt *reduceOptions) []StringPoint {
return []StringPoint{{Time: opt.startTime, Value: ""}}
func stringStddevReduceSlice(a []StringPoint) []StringPoint {
return []StringPoint{{Time: ZeroTime, Value: ""}}
}
// newSpreadIterator returns an iterator for operating on a spread() call.
func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: floatSpreadReduceSlice}, nil
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(floatSpreadReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: integerSpreadReduceSlice}, nil
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(integerSpreadReduceSlice)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported spread iterator type: %T", input)
}
}
// floatSpreadReduceSlice returns the spread value within a window.
func floatSpreadReduceSlice(a []FloatPoint, opt *reduceOptions) []FloatPoint {
func floatSpreadReduceSlice(a []FloatPoint) []FloatPoint {
// Find min & max values.
min, max := a[0].Value, a[0].Value
for _, p := range a[1:] {
min = math.Min(min, p.Value)
max = math.Max(max, p.Value)
}
return []FloatPoint{{Time: opt.startTime, Value: max - min}}
return []FloatPoint{{Time: ZeroTime, Value: max - min}}
}
// integerSpreadReduceSlice returns the spread value within a window.
func integerSpreadReduceSlice(a []IntegerPoint, opt *reduceOptions) []IntegerPoint {
func integerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint {
// Find min & max values.
min, max := a[0].Value, a[0].Value
for _, p := range a[1:] {
@ -603,24 +643,34 @@ func integerSpreadReduceSlice(a []IntegerPoint, opt *reduceOptions) []IntegerPoi
max = p.Value
}
}
return []IntegerPoint{{Time: opt.startTime, Value: max - min}}
return []IntegerPoint{{Time: ZeroTime, Value: max - min}}
}
// newTopIterator returns an iterator for operating on a top() call.
func newTopIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
aggregateFn := newFloatTopReduceSliceFunc(int(n.Val), tags, opt.Interval)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(aggregateFn)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerTopReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
aggregateFn := newIntegerTopReduceSliceFunc(int(n.Val), tags, opt.Interval)
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(aggregateFn)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported top iterator type: %T", input)
}
}
// newFloatTopReduceSliceFunc returns the top values within a window.
func newFloatTopReduceSliceFunc(n int, tags []int, interval Interval) floatReduceSliceFunc {
return func(a []FloatPoint, opt *reduceOptions) []FloatPoint {
func newFloatTopReduceSliceFunc(n int, tags []int, interval Interval) FloatReduceSliceFunc {
return func(a []FloatPoint) []FloatPoint {
// Filter by tags if they exist.
if tags != nil {
a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool {
@ -655,7 +705,7 @@ func newFloatTopReduceSliceFunc(n int, tags []int, interval Interval) floatReduc
// depending on if a time interval was given or not.
if !interval.IsZero() {
for i := range points {
points[i].Time = opt.startTime
points[i].Time = ZeroTime
}
} else {
sort.Stable(floatPointsByTime(points))
@ -665,8 +715,8 @@ func newFloatTopReduceSliceFunc(n int, tags []int, interval Interval) floatReduc
}
// newIntegerTopReduceSliceFunc returns the top values within a window.
func newIntegerTopReduceSliceFunc(n int, tags []int, interval Interval) integerReduceSliceFunc {
return func(a []IntegerPoint, opt *reduceOptions) []IntegerPoint {
func newIntegerTopReduceSliceFunc(n int, tags []int, interval Interval) IntegerReduceSliceFunc {
return func(a []IntegerPoint) []IntegerPoint {
// Filter by tags if they exist.
if tags != nil {
a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool {
@ -701,7 +751,7 @@ func newIntegerTopReduceSliceFunc(n int, tags []int, interval Interval) integerR
// depending on if a time interval was given or not.
if !interval.IsZero() {
for i := range points {
points[i].Time = opt.startTime
points[i].Time = ZeroTime
}
} else {
sort.Stable(integerPointsByTime(points))
@ -714,17 +764,27 @@ func newIntegerTopReduceSliceFunc(n int, tags []int, interval Interval) integerR
func newBottomIterator(input Iterator, opt IteratorOptions, n *NumberLiteral, tags []int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
aggregateFn := newFloatBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(aggregateFn)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)}, nil
aggregateFn := newIntegerBottomReduceSliceFunc(int(n.Val), tags, opt.Interval)
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(aggregateFn)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported bottom iterator type: %T", input)
}
}
// newFloatBottomReduceSliceFunc returns the bottom values within a window.
func newFloatBottomReduceSliceFunc(n int, tags []int, interval Interval) floatReduceSliceFunc {
return func(a []FloatPoint, opt *reduceOptions) []FloatPoint {
func newFloatBottomReduceSliceFunc(n int, tags []int, interval Interval) FloatReduceSliceFunc {
return func(a []FloatPoint) []FloatPoint {
// Filter by tags if they exist.
if tags != nil {
a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool {
@ -759,7 +819,7 @@ func newFloatBottomReduceSliceFunc(n int, tags []int, interval Interval) floatRe
// depending on if a time interval was given or not.
if !interval.IsZero() {
for i := range points {
points[i].Time = opt.startTime
points[i].Time = ZeroTime
}
} else {
sort.Stable(floatPointsByTime(points))
@ -769,8 +829,8 @@ func newFloatBottomReduceSliceFunc(n int, tags []int, interval Interval) floatRe
}
// newIntegerBottomReduceSliceFunc returns the bottom values within a window.
func newIntegerBottomReduceSliceFunc(n int, tags []int, interval Interval) integerReduceSliceFunc {
return func(a []IntegerPoint, opt *reduceOptions) []IntegerPoint {
func newIntegerBottomReduceSliceFunc(n int, tags []int, interval Interval) IntegerReduceSliceFunc {
return func(a []IntegerPoint) []IntegerPoint {
// Filter by tags if they exist.
if tags != nil {
a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool {
@ -805,7 +865,7 @@ func newIntegerBottomReduceSliceFunc(n int, tags []int, interval Interval) integ
// depending on if a time interval was given or not.
if !interval.IsZero() {
for i := range points {
points[i].Time = opt.startTime
points[i].Time = ZeroTime
}
} else {
sort.Stable(integerPointsByTime(points))
@ -878,32 +938,42 @@ func filterIntegerByUniqueTags(a []IntegerPoint, tags []int, cmpFunc func(cur, p
func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatPercentileReduceSliceFunc(percentile)}, nil
floatPercentileReduceSlice := newFloatPercentileReduceSliceFunc(percentile)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(floatPercentileReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerPercentileReduceSliceFunc(percentile)}, nil
integerPercentileReduceSlice := newIntegerPercentileReduceSliceFunc(percentile)
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(integerPercentileReduceSlice)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported percentile iterator type: %T", input)
}
}
// newFloatPercentileReduceSliceFunc returns the percentile value within a window.
func newFloatPercentileReduceSliceFunc(percentile float64) floatReduceSliceFunc {
return func(a []FloatPoint, opt *reduceOptions) []FloatPoint {
func newFloatPercentileReduceSliceFunc(percentile float64) FloatReduceSliceFunc {
return func(a []FloatPoint) []FloatPoint {
length := len(a)
i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
if i < 0 || i >= length {
return []FloatPoint{{Time: opt.startTime, Nil: true}}
return nil
}
sort.Sort(floatPointsByValue(a))
return []FloatPoint{{Time: opt.startTime, Value: a[i].Value}}
return []FloatPoint{{Time: ZeroTime, Value: a[i].Value}}
}
}
// newIntegerPercentileReduceSliceFunc returns the percentile value within a window.
func newIntegerPercentileReduceSliceFunc(percentile float64) integerReduceSliceFunc {
return func(a []IntegerPoint, opt *reduceOptions) []IntegerPoint {
func newIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceFunc {
return func(a []IntegerPoint) []IntegerPoint {
length := len(a)
i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
@ -912,7 +982,7 @@ func newIntegerPercentileReduceSliceFunc(percentile float64) integerReduceSliceF
}
sort.Sort(integerPointsByValue(a))
return []IntegerPoint{{Time: opt.startTime, Value: a[i].Value}}
return []IntegerPoint{{Time: ZeroTime, Value: a[i].Value}}
}
}
@ -920,26 +990,36 @@ func newIntegerPercentileReduceSliceFunc(percentile float64) integerReduceSliceF
func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
return &floatReduceSliceIterator{input: newBufFloatIterator(input), opt: opt, fn: newFloatDerivativeReduceSliceFunc(interval, isNonNegative)}, nil
floatDerivativeReduceSlice := newFloatDerivativeReduceSliceFunc(interval, isNonNegative)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(floatDerivativeReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
return &integerReduceSliceFloatIterator{input: newBufIntegerIterator(input), opt: opt, fn: newIntegerDerivativeReduceSliceFunc(interval, isNonNegative)}, nil
integerDerivativeReduceSlice := newIntegerDerivativeReduceSliceFunc(interval, isNonNegative)
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerSliceFuncFloatReducer(integerDerivativeReduceSlice)
return fn, fn
}
return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported derivative iterator type: %T", input)
}
}
// newFloatDerivativeReduceSliceFunc returns the derivative value within a window.
func newFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) floatReduceSliceFunc {
prev := FloatPoint{Time: -1}
func newFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) FloatReduceSliceFunc {
prev := FloatPoint{Nil: true}
return func(a []FloatPoint, opt *reduceOptions) []FloatPoint {
return func(a []FloatPoint) []FloatPoint {
if len(a) == 0 {
return a
} else if len(a) == 1 {
return []FloatPoint{{Time: a[0].Time, Nil: true}}
}
if prev.Time == -1 {
if prev.Nil {
prev = a[0]
}
@ -971,17 +1051,17 @@ func newFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) fl
}
// newIntegerDerivativeReduceSliceFunc returns the derivative value within a window.
func newIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) integerReduceSliceFloatFunc {
prev := IntegerPoint{Time: -1}
func newIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) IntegerReduceFloatSliceFunc {
prev := IntegerPoint{Nil: true}
return func(a []IntegerPoint, opt *reduceOptions) []FloatPoint {
return func(a []IntegerPoint) []FloatPoint {
if len(a) == 0 {
return []FloatPoint{}
} else if len(a) == 1 {
return []FloatPoint{{Time: a[0].Time, Nil: true}}
}
if prev.Time == -1 {
if prev.Nil {
prev = a[0]
}
@ -1011,102 +1091,3 @@ func newIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool)
return output
}
}
// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result.
// This iterator receives an integer iterator but produces a float iterator.
type integerReduceSliceFloatIterator struct {
input *bufIntegerIterator
fn integerReduceSliceFloatFunc
opt IteratorOptions
points []FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceSliceFloatIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceSliceFloatIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceSliceFloatIterator) reduce() []FloatPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []IntegerPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]FloatPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]FloatPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// integerReduceSliceFloatFunc is the function called by a IntegerPoint slice reducer that emits FloatPoint.
type integerReduceSliceFloatFunc func(a []IntegerPoint, opt *reduceOptions) []FloatPoint

View File

@ -8,12 +8,31 @@ package influxql
// FloatPointAggregator aggregates points to produce a single point.
type FloatPointAggregator interface {
Aggregate(p *FloatPoint)
AggregateFloat(p *FloatPoint)
}
// FloatBulkPointAggregator aggregates multiple points at a time.
type FloatBulkPointAggregator interface {
AggregateFloatBulk(points []FloatPoint)
}
// AggregateFloatPoints feeds a slice of FloatPoint into an
// aggregator. If the aggregator is a FloatBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateFloatPoints(a FloatPointAggregator, points []FloatPoint) {
switch a := a.(type) {
case FloatBulkPointAggregator:
a.AggregateFloatBulk(points)
default:
for _, p := range points {
a.AggregateFloat(&p)
}
}
}
// FloatPointEmitter produces a single point from an aggregate.
type FloatPointEmitter interface {
Emit() *FloatPoint
Emit() []FloatPoint
}
// FloatReduceFunc is the function called by a FloatPoint reducer.
@ -28,7 +47,7 @@ func NewFloatFuncReducer(fn FloatReduceFunc) *FloatFuncReducer {
return &FloatFuncReducer{fn: fn}
}
func (r *FloatFuncReducer) Aggregate(p *FloatPoint) {
func (r *FloatFuncReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
@ -36,11 +55,39 @@ func (r *FloatFuncReducer) Aggregate(p *FloatPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *FloatFuncReducer) Emit() *FloatPoint {
return r.prev
func (r *FloatFuncReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// FloatReduceSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceSliceFunc func(a []FloatPoint) []FloatPoint
type FloatSliceFuncReducer struct {
points []FloatPoint
fn FloatReduceSliceFunc
}
func NewFloatSliceFuncReducer(fn FloatReduceSliceFunc) *FloatSliceFuncReducer {
return &FloatSliceFuncReducer{fn: fn}
}
func (r *FloatSliceFuncReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
}
func (r *FloatSliceFuncReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
func (r *FloatSliceFuncReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// FloatReduceIntegerFunc is the function called by a FloatPoint reducer.
@ -55,7 +102,7 @@ func NewFloatFuncIntegerReducer(fn FloatReduceIntegerFunc) *FloatFuncIntegerRedu
return &FloatFuncIntegerReducer{fn: fn}
}
func (r *FloatFuncIntegerReducer) Aggregate(p *FloatPoint) {
func (r *FloatFuncIntegerReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
@ -63,11 +110,39 @@ func (r *FloatFuncIntegerReducer) Aggregate(p *FloatPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *FloatFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
func (r *FloatFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// FloatReduceIntegerSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceIntegerSliceFunc func(a []FloatPoint) []IntegerPoint
type FloatSliceFuncIntegerReducer struct {
points []FloatPoint
fn FloatReduceIntegerSliceFunc
}
func NewFloatSliceFuncIntegerReducer(fn FloatReduceIntegerSliceFunc) *FloatSliceFuncIntegerReducer {
return &FloatSliceFuncIntegerReducer{fn: fn}
}
func (r *FloatSliceFuncIntegerReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
}
func (r *FloatSliceFuncIntegerReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
func (r *FloatSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// FloatReduceStringFunc is the function called by a FloatPoint reducer.
@ -82,7 +157,7 @@ func NewFloatFuncStringReducer(fn FloatReduceStringFunc) *FloatFuncStringReducer
return &FloatFuncStringReducer{fn: fn}
}
func (r *FloatFuncStringReducer) Aggregate(p *FloatPoint) {
func (r *FloatFuncStringReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
@ -90,11 +165,39 @@ func (r *FloatFuncStringReducer) Aggregate(p *FloatPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *FloatFuncStringReducer) Emit() *StringPoint {
return r.prev
func (r *FloatFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// FloatReduceStringSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceStringSliceFunc func(a []FloatPoint) []StringPoint
type FloatSliceFuncStringReducer struct {
points []FloatPoint
fn FloatReduceStringSliceFunc
}
func NewFloatSliceFuncStringReducer(fn FloatReduceStringSliceFunc) *FloatSliceFuncStringReducer {
return &FloatSliceFuncStringReducer{fn: fn}
}
func (r *FloatSliceFuncStringReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
}
func (r *FloatSliceFuncStringReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
func (r *FloatSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// FloatReduceBooleanFunc is the function called by a FloatPoint reducer.
@ -109,7 +212,7 @@ func NewFloatFuncBooleanReducer(fn FloatReduceBooleanFunc) *FloatFuncBooleanRedu
return &FloatFuncBooleanReducer{fn: fn}
}
func (r *FloatFuncBooleanReducer) Aggregate(p *FloatPoint) {
func (r *FloatFuncBooleanReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
@ -117,21 +220,68 @@ func (r *FloatFuncBooleanReducer) Aggregate(p *FloatPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *FloatFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
func (r *FloatFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// FloatReduceBooleanSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceBooleanSliceFunc func(a []FloatPoint) []BooleanPoint
type FloatSliceFuncBooleanReducer struct {
points []FloatPoint
fn FloatReduceBooleanSliceFunc
}
func NewFloatSliceFuncBooleanReducer(fn FloatReduceBooleanSliceFunc) *FloatSliceFuncBooleanReducer {
return &FloatSliceFuncBooleanReducer{fn: fn}
}
func (r *FloatSliceFuncBooleanReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
}
func (r *FloatSliceFuncBooleanReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// IntegerPointAggregator aggregates points to produce a single point.
type IntegerPointAggregator interface {
Aggregate(p *IntegerPoint)
AggregateInteger(p *IntegerPoint)
}
// IntegerBulkPointAggregator aggregates multiple points at a time.
type IntegerBulkPointAggregator interface {
AggregateIntegerBulk(points []IntegerPoint)
}
// AggregateIntegerPoints feeds a slice of IntegerPoint into an
// aggregator. If the aggregator is a IntegerBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateIntegerPoints(a IntegerPointAggregator, points []IntegerPoint) {
switch a := a.(type) {
case IntegerBulkPointAggregator:
a.AggregateIntegerBulk(points)
default:
for _, p := range points {
a.AggregateInteger(&p)
}
}
}
// IntegerPointEmitter produces a single point from an aggregate.
type IntegerPointEmitter interface {
Emit() *IntegerPoint
Emit() []IntegerPoint
}
// IntegerReduceFloatFunc is the function called by a IntegerPoint reducer.
@ -146,7 +296,7 @@ func NewIntegerFuncFloatReducer(fn IntegerReduceFloatFunc) *IntegerFuncFloatRedu
return &IntegerFuncFloatReducer{fn: fn}
}
func (r *IntegerFuncFloatReducer) Aggregate(p *IntegerPoint) {
func (r *IntegerFuncFloatReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
@ -154,11 +304,39 @@ func (r *IntegerFuncFloatReducer) Aggregate(p *IntegerPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *IntegerFuncFloatReducer) Emit() *FloatPoint {
return r.prev
func (r *IntegerFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// IntegerReduceFloatSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceFloatSliceFunc func(a []IntegerPoint) []FloatPoint
type IntegerSliceFuncFloatReducer struct {
points []IntegerPoint
fn IntegerReduceFloatSliceFunc
}
func NewIntegerSliceFuncFloatReducer(fn IntegerReduceFloatSliceFunc) *IntegerSliceFuncFloatReducer {
return &IntegerSliceFuncFloatReducer{fn: fn}
}
func (r *IntegerSliceFuncFloatReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
}
func (r *IntegerSliceFuncFloatReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
func (r *IntegerSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// IntegerReduceFunc is the function called by a IntegerPoint reducer.
@ -173,7 +351,7 @@ func NewIntegerFuncReducer(fn IntegerReduceFunc) *IntegerFuncReducer {
return &IntegerFuncReducer{fn: fn}
}
func (r *IntegerFuncReducer) Aggregate(p *IntegerPoint) {
func (r *IntegerFuncReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
@ -181,11 +359,39 @@ func (r *IntegerFuncReducer) Aggregate(p *IntegerPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *IntegerFuncReducer) Emit() *IntegerPoint {
return r.prev
func (r *IntegerFuncReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// IntegerReduceSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceSliceFunc func(a []IntegerPoint) []IntegerPoint
type IntegerSliceFuncReducer struct {
points []IntegerPoint
fn IntegerReduceSliceFunc
}
func NewIntegerSliceFuncReducer(fn IntegerReduceSliceFunc) *IntegerSliceFuncReducer {
return &IntegerSliceFuncReducer{fn: fn}
}
func (r *IntegerSliceFuncReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
}
func (r *IntegerSliceFuncReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
func (r *IntegerSliceFuncReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// IntegerReduceStringFunc is the function called by a IntegerPoint reducer.
@ -200,7 +406,7 @@ func NewIntegerFuncStringReducer(fn IntegerReduceStringFunc) *IntegerFuncStringR
return &IntegerFuncStringReducer{fn: fn}
}
func (r *IntegerFuncStringReducer) Aggregate(p *IntegerPoint) {
func (r *IntegerFuncStringReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
@ -208,11 +414,39 @@ func (r *IntegerFuncStringReducer) Aggregate(p *IntegerPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *IntegerFuncStringReducer) Emit() *StringPoint {
return r.prev
func (r *IntegerFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// IntegerReduceStringSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceStringSliceFunc func(a []IntegerPoint) []StringPoint
type IntegerSliceFuncStringReducer struct {
points []IntegerPoint
fn IntegerReduceStringSliceFunc
}
func NewIntegerSliceFuncStringReducer(fn IntegerReduceStringSliceFunc) *IntegerSliceFuncStringReducer {
return &IntegerSliceFuncStringReducer{fn: fn}
}
func (r *IntegerSliceFuncStringReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
}
func (r *IntegerSliceFuncStringReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
func (r *IntegerSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// IntegerReduceBooleanFunc is the function called by a IntegerPoint reducer.
@ -227,7 +461,7 @@ func NewIntegerFuncBooleanReducer(fn IntegerReduceBooleanFunc) *IntegerFuncBoole
return &IntegerFuncBooleanReducer{fn: fn}
}
func (r *IntegerFuncBooleanReducer) Aggregate(p *IntegerPoint) {
func (r *IntegerFuncBooleanReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
@ -235,21 +469,68 @@ func (r *IntegerFuncBooleanReducer) Aggregate(p *IntegerPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *IntegerFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
func (r *IntegerFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// IntegerReduceBooleanSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceBooleanSliceFunc func(a []IntegerPoint) []BooleanPoint
type IntegerSliceFuncBooleanReducer struct {
points []IntegerPoint
fn IntegerReduceBooleanSliceFunc
}
func NewIntegerSliceFuncBooleanReducer(fn IntegerReduceBooleanSliceFunc) *IntegerSliceFuncBooleanReducer {
return &IntegerSliceFuncBooleanReducer{fn: fn}
}
func (r *IntegerSliceFuncBooleanReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
}
func (r *IntegerSliceFuncBooleanReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// StringPointAggregator aggregates points to produce a single point.
type StringPointAggregator interface {
Aggregate(p *StringPoint)
AggregateString(p *StringPoint)
}
// StringBulkPointAggregator aggregates multiple points at a time.
type StringBulkPointAggregator interface {
AggregateStringBulk(points []StringPoint)
}
// AggregateStringPoints feeds a slice of StringPoint into an
// aggregator. If the aggregator is a StringBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateStringPoints(a StringPointAggregator, points []StringPoint) {
switch a := a.(type) {
case StringBulkPointAggregator:
a.AggregateStringBulk(points)
default:
for _, p := range points {
a.AggregateString(&p)
}
}
}
// StringPointEmitter produces a single point from an aggregate.
type StringPointEmitter interface {
Emit() *StringPoint
Emit() []StringPoint
}
// StringReduceFloatFunc is the function called by a StringPoint reducer.
@ -264,7 +545,7 @@ func NewStringFuncFloatReducer(fn StringReduceFloatFunc) *StringFuncFloatReducer
return &StringFuncFloatReducer{fn: fn}
}
func (r *StringFuncFloatReducer) Aggregate(p *StringPoint) {
func (r *StringFuncFloatReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
@ -272,11 +553,39 @@ func (r *StringFuncFloatReducer) Aggregate(p *StringPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *StringFuncFloatReducer) Emit() *FloatPoint {
return r.prev
func (r *StringFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// StringReduceFloatSliceFunc is the function called by a StringPoint reducer.
type StringReduceFloatSliceFunc func(a []StringPoint) []FloatPoint
type StringSliceFuncFloatReducer struct {
points []StringPoint
fn StringReduceFloatSliceFunc
}
func NewStringSliceFuncFloatReducer(fn StringReduceFloatSliceFunc) *StringSliceFuncFloatReducer {
return &StringSliceFuncFloatReducer{fn: fn}
}
func (r *StringSliceFuncFloatReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
}
func (r *StringSliceFuncFloatReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
func (r *StringSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// StringReduceIntegerFunc is the function called by a StringPoint reducer.
@ -291,7 +600,7 @@ func NewStringFuncIntegerReducer(fn StringReduceIntegerFunc) *StringFuncIntegerR
return &StringFuncIntegerReducer{fn: fn}
}
func (r *StringFuncIntegerReducer) Aggregate(p *StringPoint) {
func (r *StringFuncIntegerReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
@ -299,11 +608,39 @@ func (r *StringFuncIntegerReducer) Aggregate(p *StringPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *StringFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
func (r *StringFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// StringReduceIntegerSliceFunc is the function called by a StringPoint reducer.
type StringReduceIntegerSliceFunc func(a []StringPoint) []IntegerPoint
type StringSliceFuncIntegerReducer struct {
points []StringPoint
fn StringReduceIntegerSliceFunc
}
func NewStringSliceFuncIntegerReducer(fn StringReduceIntegerSliceFunc) *StringSliceFuncIntegerReducer {
return &StringSliceFuncIntegerReducer{fn: fn}
}
func (r *StringSliceFuncIntegerReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
}
func (r *StringSliceFuncIntegerReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
func (r *StringSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// StringReduceFunc is the function called by a StringPoint reducer.
@ -318,7 +655,7 @@ func NewStringFuncReducer(fn StringReduceFunc) *StringFuncReducer {
return &StringFuncReducer{fn: fn}
}
func (r *StringFuncReducer) Aggregate(p *StringPoint) {
func (r *StringFuncReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
@ -326,11 +663,39 @@ func (r *StringFuncReducer) Aggregate(p *StringPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *StringFuncReducer) Emit() *StringPoint {
return r.prev
func (r *StringFuncReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// StringReduceSliceFunc is the function called by a StringPoint reducer.
type StringReduceSliceFunc func(a []StringPoint) []StringPoint
type StringSliceFuncReducer struct {
points []StringPoint
fn StringReduceSliceFunc
}
func NewStringSliceFuncReducer(fn StringReduceSliceFunc) *StringSliceFuncReducer {
return &StringSliceFuncReducer{fn: fn}
}
func (r *StringSliceFuncReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
}
func (r *StringSliceFuncReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
func (r *StringSliceFuncReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// StringReduceBooleanFunc is the function called by a StringPoint reducer.
@ -345,7 +710,7 @@ func NewStringFuncBooleanReducer(fn StringReduceBooleanFunc) *StringFuncBooleanR
return &StringFuncBooleanReducer{fn: fn}
}
func (r *StringFuncBooleanReducer) Aggregate(p *StringPoint) {
func (r *StringFuncBooleanReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
@ -353,21 +718,68 @@ func (r *StringFuncBooleanReducer) Aggregate(p *StringPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *StringFuncBooleanReducer) Emit() *BooleanPoint {
return r.prev
func (r *StringFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// StringReduceBooleanSliceFunc is the function called by a StringPoint reducer.
type StringReduceBooleanSliceFunc func(a []StringPoint) []BooleanPoint
type StringSliceFuncBooleanReducer struct {
points []StringPoint
fn StringReduceBooleanSliceFunc
}
func NewStringSliceFuncBooleanReducer(fn StringReduceBooleanSliceFunc) *StringSliceFuncBooleanReducer {
return &StringSliceFuncBooleanReducer{fn: fn}
}
func (r *StringSliceFuncBooleanReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
}
func (r *StringSliceFuncBooleanReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// BooleanPointAggregator aggregates points to produce a single point.
type BooleanPointAggregator interface {
Aggregate(p *BooleanPoint)
AggregateBoolean(p *BooleanPoint)
}
// BooleanBulkPointAggregator aggregates multiple points at a time.
type BooleanBulkPointAggregator interface {
AggregateBooleanBulk(points []BooleanPoint)
}
// AggregateBooleanPoints feeds a slice of BooleanPoint into an
// aggregator. If the aggregator is a BooleanBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateBooleanPoints(a BooleanPointAggregator, points []BooleanPoint) {
switch a := a.(type) {
case BooleanBulkPointAggregator:
a.AggregateBooleanBulk(points)
default:
for _, p := range points {
a.AggregateBoolean(&p)
}
}
}
// BooleanPointEmitter produces a single point from an aggregate.
type BooleanPointEmitter interface {
Emit() *BooleanPoint
Emit() []BooleanPoint
}
// BooleanReduceFloatFunc is the function called by a BooleanPoint reducer.
@ -382,7 +794,7 @@ func NewBooleanFuncFloatReducer(fn BooleanReduceFloatFunc) *BooleanFuncFloatRedu
return &BooleanFuncFloatReducer{fn: fn}
}
func (r *BooleanFuncFloatReducer) Aggregate(p *BooleanPoint) {
func (r *BooleanFuncFloatReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
@ -390,11 +802,39 @@ func (r *BooleanFuncFloatReducer) Aggregate(p *BooleanPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *BooleanFuncFloatReducer) Emit() *FloatPoint {
return r.prev
func (r *BooleanFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// BooleanReduceFloatSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceFloatSliceFunc func(a []BooleanPoint) []FloatPoint
type BooleanSliceFuncFloatReducer struct {
points []BooleanPoint
fn BooleanReduceFloatSliceFunc
}
func NewBooleanSliceFuncFloatReducer(fn BooleanReduceFloatSliceFunc) *BooleanSliceFuncFloatReducer {
return &BooleanSliceFuncFloatReducer{fn: fn}
}
func (r *BooleanSliceFuncFloatReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
}
func (r *BooleanSliceFuncFloatReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
func (r *BooleanSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// BooleanReduceIntegerFunc is the function called by a BooleanPoint reducer.
@ -409,7 +849,7 @@ func NewBooleanFuncIntegerReducer(fn BooleanReduceIntegerFunc) *BooleanFuncInteg
return &BooleanFuncIntegerReducer{fn: fn}
}
func (r *BooleanFuncIntegerReducer) Aggregate(p *BooleanPoint) {
func (r *BooleanFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
@ -417,11 +857,39 @@ func (r *BooleanFuncIntegerReducer) Aggregate(p *BooleanPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *BooleanFuncIntegerReducer) Emit() *IntegerPoint {
return r.prev
func (r *BooleanFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// BooleanReduceIntegerSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceIntegerSliceFunc func(a []BooleanPoint) []IntegerPoint
type BooleanSliceFuncIntegerReducer struct {
points []BooleanPoint
fn BooleanReduceIntegerSliceFunc
}
func NewBooleanSliceFuncIntegerReducer(fn BooleanReduceIntegerSliceFunc) *BooleanSliceFuncIntegerReducer {
return &BooleanSliceFuncIntegerReducer{fn: fn}
}
func (r *BooleanSliceFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
}
func (r *BooleanSliceFuncIntegerReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
func (r *BooleanSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// BooleanReduceStringFunc is the function called by a BooleanPoint reducer.
@ -436,7 +904,7 @@ func NewBooleanFuncStringReducer(fn BooleanReduceStringFunc) *BooleanFuncStringR
return &BooleanFuncStringReducer{fn: fn}
}
func (r *BooleanFuncStringReducer) Aggregate(p *BooleanPoint) {
func (r *BooleanFuncStringReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
@ -444,11 +912,39 @@ func (r *BooleanFuncStringReducer) Aggregate(p *BooleanPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *BooleanFuncStringReducer) Emit() *StringPoint {
return r.prev
func (r *BooleanFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// BooleanReduceStringSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceStringSliceFunc func(a []BooleanPoint) []StringPoint
type BooleanSliceFuncStringReducer struct {
points []BooleanPoint
fn BooleanReduceStringSliceFunc
}
func NewBooleanSliceFuncStringReducer(fn BooleanReduceStringSliceFunc) *BooleanSliceFuncStringReducer {
return &BooleanSliceFuncStringReducer{fn: fn}
}
func (r *BooleanSliceFuncStringReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
}
func (r *BooleanSliceFuncStringReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
func (r *BooleanSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// BooleanReduceFunc is the function called by a BooleanPoint reducer.
@ -463,7 +959,7 @@ func NewBooleanFuncReducer(fn BooleanReduceFunc) *BooleanFuncReducer {
return &BooleanFuncReducer{fn: fn}
}
func (r *BooleanFuncReducer) Aggregate(p *BooleanPoint) {
func (r *BooleanFuncReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
@ -471,9 +967,37 @@ func (r *BooleanFuncReducer) Aggregate(p *BooleanPoint) {
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *BooleanFuncReducer) Emit() *BooleanPoint {
return r.prev
func (r *BooleanFuncReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// BooleanReduceSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceSliceFunc func(a []BooleanPoint) []BooleanPoint
type BooleanSliceFuncReducer struct {
points []BooleanPoint
fn BooleanReduceSliceFunc
}
func NewBooleanSliceFuncReducer(fn BooleanReduceSliceFunc) *BooleanSliceFuncReducer {
return &BooleanSliceFuncReducer{fn: fn}
}
func (r *BooleanSliceFuncReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
}
func (r *BooleanSliceFuncReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}

View File

@ -4,12 +4,31 @@ package influxql
// {{$k.Name}}PointAggregator aggregates points to produce a single point.
type {{$k.Name}}PointAggregator interface {
Aggregate(p *{{.Name}}Point)
Aggregate{{$k.Name}}(p *{{$k.Name}}Point)
}
// {{$k.Name}}BulkPointAggregator aggregates multiple points at a time.
type {{$k.Name}}BulkPointAggregator interface {
Aggregate{{$k.Name}}Bulk(points []{{$k.Name}}Point)
}
// Aggregate{{$k.Name}}Points feeds a slice of {{$k.Name}}Point into an
// aggregator. If the aggregator is a {{$k.Name}}BulkPointAggregator, it will
// use the AggregateBulk method.
func Aggregate{{$k.Name}}Points(a {{$k.Name}}PointAggregator, points []{{$k.Name}}Point) {
switch a := a.(type) {
case {{$k.Name}}BulkPointAggregator:
a.Aggregate{{$k.Name}}Bulk(points)
default:
for _, p := range points {
a.Aggregate{{$k.Name}}(&p)
}
}
}
// {{$k.Name}}PointEmitter produces a single point from an aggregate.
type {{$k.Name}}PointEmitter interface {
Emit() *{{$k.Name}}Point
Emit() []{{$k.Name}}Point
}
{{range $v := $types}}
@ -26,18 +45,46 @@ func New{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(fn {{$
return &{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer{fn: fn}
}
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate(p *{{$k.Name}}Point) {
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &{{$v.Name}}Point{}
}
r.prev.Time = t
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
r.prev.Aggregated++
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() *{{$v.Name}}Point {
return r.prev
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() []{{$v.Name}}Point {
return []{{$v.Name}}Point{*r.prev}
}
// {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc is the function called by a {{$k.Name}}Point reducer.
type {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc func(a []{{$k.Name}}Point) []{{$v.Name}}Point
type {{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer struct {
points []{{$k.Name}}Point
fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc
}
func New{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc) *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer {
return &{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer{fn: fn}
}
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
r.points = append(r.points, *p)
}
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}Bulk(points []{{$k.Name}}Point) {
r.points = append(r.points, points...)
}
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() []{{$v.Name}}Point {
return r.fn(r.points)
}
{{end}}{{end}}{{end}}

View File

@ -9,7 +9,7 @@ func NewFloatMeanReducer() *FloatMeanReducer {
return &FloatMeanReducer{}
}
func (r *FloatMeanReducer) Aggregate(p *FloatPoint) {
func (r *FloatMeanReducer) AggregateFloat(p *FloatPoint) {
if p.Aggregated >= 2 {
r.sum += p.Value * float64(p.Aggregated)
r.count += p.Aggregated
@ -19,12 +19,12 @@ func (r *FloatMeanReducer) Aggregate(p *FloatPoint) {
}
}
func (r *FloatMeanReducer) Emit() *FloatPoint {
return &FloatPoint{
func (r *FloatMeanReducer) Emit() []FloatPoint {
return []FloatPoint{{
Time: ZeroTime,
Value: r.sum / float64(r.count),
Aggregated: r.count,
}
}}
}
type IntegerMeanReducer struct {
@ -36,7 +36,7 @@ func NewIntegerMeanReducer() *IntegerMeanReducer {
return &IntegerMeanReducer{}
}
func (r *IntegerMeanReducer) Aggregate(p *IntegerPoint) {
func (r *IntegerMeanReducer) AggregateInteger(p *IntegerPoint) {
if p.Aggregated >= 2 {
r.sum += p.Value * int64(p.Aggregated)
r.count += p.Aggregated
@ -46,10 +46,10 @@ func (r *IntegerMeanReducer) Aggregate(p *IntegerPoint) {
}
}
func (r *IntegerMeanReducer) Emit() *FloatPoint {
return &FloatPoint{
func (r *IntegerMeanReducer) Emit() []FloatPoint {
return []FloatPoint{{
Time: ZeroTime,
Value: float64(r.sum) / float64(r.count),
Aggregated: r.count,
}
}}
}

File diff suppressed because it is too large Load Diff

View File

@ -119,7 +119,7 @@ func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptio
inputs: inputs,
heap: &{{$k.name}}MergeHeap{
items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)),
opt: opt,
opt: opt,
},
}
@ -416,13 +416,13 @@ func (itr *{{$k.name}}LimitIterator) Next() *{{$k.Name}}Point {
}
type {{$k.name}}FillIterator struct {
input *buf{{$k.Name}}Iterator
prev *{{$k.Name}}Point
startTime int64
endTime int64
auxFields []interface{}
done bool
opt IteratorOptions
input *buf{{$k.Name}}Iterator
prev *{{$k.Name}}Point
startTime int64
endTime int64
auxFields []interface{}
done bool
opt IteratorOptions
window struct {
name string
@ -454,11 +454,11 @@ func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt Iterat
}
itr := &{{$k.name}}FillIterator{
input: newBuf{{$k.Name}}Iterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
input: newBuf{{$k.Name}}Iterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
}
p := itr.input.peek()
@ -586,7 +586,7 @@ func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList,
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output }
func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output }
func (itr *{{$k.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *{{$k.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
@ -648,7 +648,7 @@ type {{$k.name}}Reduce{{$v.Name}}Iterator struct {
input *buf{{$k.Name}}Iterator
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
opt IteratorOptions
points []*{{$v.Name}}Point
points []{{$v.Name}}Point
}
// Close closes the iterator and all child iterators.
@ -665,7 +665,7 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() *{{.Name}}Point {
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
p := &itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
@ -680,7 +680,7 @@ type {{$k.name}}Reduce{{$v.Name}}Point struct {
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []*{{$v.Name}}Point {
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
@ -709,7 +709,7 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []*{{$v.Name}}Point {
}
m[id] = rp
}
rp.Aggregator.Aggregate(curr)
rp.Aggregator.Aggregate{{$k.Name}}(curr)
}
// Reverse sort points by name & tag.
@ -719,124 +719,26 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []*{{$v.Name}}Point {
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*{{$v.Name}}Point, len(m))
for i, k := range keys {
a := make([]{{$v.Name}}Point, 0, len(m))
for _, k := range keys {
rp := m[k]
p := rp.Emitter.Emit()
p.Name = rp.Name
p.Tags = rp.Tags
// Set the points time to the interval time if the reducer didn't provide one.
if p.Time == ZeroTime {
p.Time = startTime
points := rp.Emitter.Emit()
for i := len(points)-1; i >= 0; i-- {
points[i].Name = rp.Name
points[i].Tags = rp.Tags
// Set the points time to the interval time if the reducer didn't provide one.
if points[i].Time == ZeroTime {
points[i].Time = startTime
}
a = append(a, points[i])
}
a[i] = p
}
return a
}
{{end}}
// {{$k.name}}ReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type {{$k.name}}ReduceSliceIterator struct {
input *buf{{$k.Name}}Iterator
fn {{$k.name}}ReduceSliceFunc
opt IteratorOptions
points []{{$k.Name}}Point
}
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}ReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{$k.name}}ReduceSliceIterator) Next() *{{$k.Name}}Point {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{$k.name}}ReduceSliceIterator) reduce() []{{$k.Name}}Point {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []{{$k.Name}}Point
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
} else if p.Nil {
continue
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := p.Name + "\x00" + tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]{{$k.Name}}Point)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]{{$k.Name}}Point, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// {{$k.name}}ReduceSliceFunc is the function called by a {{$k.Name}}Point slice reducer.
type {{$k.name}}ReduceSliceFunc func(a []{{$k.Name}}Point, opt *reduceOptions) []{{$k.Name}}Point
// {{$k.name}}ReduceIterator executes a function to modify an existing point for every
// {{$k.name}}TransformIterator executes a function to modify an existing point for every
// output of the input iterator.
type {{$k.name}}TransformIterator struct {
input {{$k.Name}}Iterator
@ -889,15 +791,15 @@ type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type {{$k.name}}DedupeIterator struct {
input {{$k.Name}}Iterator
m map[string]struct{} // lookup of points already sent
input {{$k.Name}}Iterator
m map[string]struct{} // lookup of points already sent
}
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
return &{{$k.name}}DedupeIterator{
input: input,
m: make(map[string]struct{}),
m: make(map[string]struct{}),
}
}

View File

@ -956,12 +956,6 @@ func decodeInterval(pb *internal.Interval) Interval {
}
}
// reduceOptions represents options for performing reductions on windows of points.
type reduceOptions struct {
startTime int64
endTime int64
}
type nilFloatIterator struct{}
func (*nilFloatIterator) Close() error { return nil }