Move the integerReduceSliceFloatIterator to call_iterator.go

It matches more in functionality to the functions in call_iterator.go
than iterator.go. iterator.go mostly has base iterators and
call_iterator.go has iterators related to functional calls, which is
the only time integerReduceSliceFloatIterator is used.
pull/5196/head
Jonathan A. Sternberg 2016-01-22 13:58:15 -05:00 committed by Ben Johnson
parent fa79aae584
commit 03ad7a4e40
2 changed files with 99 additions and 100 deletions

View File

@ -709,3 +709,102 @@ func newFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) fl
return output
}
}
// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result.
// This iterator receives an integer iterator but produces a float iterator.
type integerReduceSliceFloatIterator struct {
input *bufIntegerIterator
fn integerReduceSliceFloatFunc
opt IteratorOptions
points []FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceSliceFloatIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceSliceFloatIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceSliceFloatIterator) reduce() []FloatPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []IntegerPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]FloatPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]FloatPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// integerReduceSliceFloatFunc is the function called by a IntegerPoint slice reducer that emits FloatPoint.
type integerReduceSliceFloatFunc func(a []IntegerPoint, opt *reduceOptions) []FloatPoint

View File

@ -3,7 +3,6 @@ package influxql
import (
"errors"
"fmt"
"sort"
"time"
)
@ -519,102 +518,3 @@ type nilFloatIterator struct{}
func (*nilFloatIterator) Close() error { return nil }
func (*nilFloatIterator) Next() *FloatPoint { return nil }
// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result.
// This iterator receives an integer iterator but produces a float iterator.
type integerReduceSliceFloatIterator struct {
input *bufIntegerIterator
fn integerReduceSliceFloatFunc
opt IteratorOptions
points []FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceSliceFloatIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceSliceFloatIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceSliceFloatIterator) reduce() []FloatPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []IntegerPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]FloatPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]FloatPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// integerReduceSliceFloatFunc is the function called by a IntegerPoint slice reducer that emits FloatPoint.
type integerReduceSliceFloatFunc func(a []IntegerPoint, opt *reduceOptions) []FloatPoint