diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index bc3168f879..0b951b7010 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -709,3 +709,102 @@ func newFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) fl return output } } + +// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result. +// This iterator receives an integer iterator but produces a float iterator. +type integerReduceSliceFloatIterator struct { + input *bufIntegerIterator + fn integerReduceSliceFloatFunc + opt IteratorOptions + points []FloatPoint +} + +// Close closes the iterator and all child iterators. +func (itr *integerReduceSliceFloatIterator) Close() error { return itr.input.Close() } + +// Next returns the minimum value for the next available interval. +func (itr *integerReduceSliceFloatIterator) Next() *FloatPoint { + // Calculate next window if we have no more points. + if len(itr.points) == 0 { + itr.points = itr.reduce() + if len(itr.points) == 0 { + return nil + } + } + + // Pop next point off the stack. + p := itr.points[len(itr.points)-1] + itr.points = itr.points[:len(itr.points)-1] + return &p +} + +// reduce executes fn once for every point in the next window. +// The previous value for the dimension is passed to fn. +func (itr *integerReduceSliceFloatIterator) reduce() []FloatPoint { + // Calculate next window. + startTime, endTime := itr.opt.Window(itr.input.peekTime()) + + var reduceOptions = reduceOptions{ + startTime: startTime, + endTime: endTime, + } + + // Group points by name and tagset. + groups := make(map[string]struct { + name string + tags Tags + points []IntegerPoint + }) + for { + // Read next point. + p := itr.input.NextInWindow(startTime, endTime) + if p == nil { + break + } + tags := p.Tags.Subset(itr.opt.Dimensions) + + // Append point to dimension. + id := tags.ID() + g := groups[id] + g.name = p.Name + g.tags = tags + g.points = append(g.points, *p) + groups[id] = g + } + + // Reduce each set into a set of values. + results := make(map[string][]FloatPoint) + for key, g := range groups { + a := itr.fn(g.points, &reduceOptions) + if len(a) == 0 { + continue + } + + // Update name and tags for each returned point. + for i := range a { + a[i].Name = g.name + a[i].Tags = g.tags + } + results[key] = a + } + + // Reverse sort points by name & tag. + keys := make([]string, 0, len(results)) + for k := range results { + keys = append(keys, k) + } + sort.Sort(sort.Reverse(sort.StringSlice(keys))) + + // Reverse order points within each key. + a := make([]FloatPoint, 0, len(results)) + for _, k := range keys { + for i := len(results[k]) - 1; i >= 0; i-- { + a = append(a, results[k][i]) + } + } + + return a +} + +// integerReduceSliceFloatFunc is the function called by a IntegerPoint slice reducer that emits FloatPoint. +type integerReduceSliceFloatFunc func(a []IntegerPoint, opt *reduceOptions) []FloatPoint diff --git a/influxql/iterator.go b/influxql/iterator.go index 1694cfd784..046ad1196e 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -3,7 +3,6 @@ package influxql import ( "errors" "fmt" - "sort" "time" ) @@ -519,102 +518,3 @@ type nilFloatIterator struct{} func (*nilFloatIterator) Close() error { return nil } func (*nilFloatIterator) Next() *FloatPoint { return nil } - -// integerReduceSliceFloatIterator executes a reducer on all points in a window and buffers the result. -// This iterator receives an integer iterator but produces a float iterator. -type integerReduceSliceFloatIterator struct { - input *bufIntegerIterator - fn integerReduceSliceFloatFunc - opt IteratorOptions - points []FloatPoint -} - -// Close closes the iterator and all child iterators. -func (itr *integerReduceSliceFloatIterator) Close() error { return itr.input.Close() } - -// Next returns the minimum value for the next available interval. -func (itr *integerReduceSliceFloatIterator) Next() *FloatPoint { - // Calculate next window if we have no more points. - if len(itr.points) == 0 { - itr.points = itr.reduce() - if len(itr.points) == 0 { - return nil - } - } - - // Pop next point off the stack. - p := itr.points[len(itr.points)-1] - itr.points = itr.points[:len(itr.points)-1] - return &p -} - -// reduce executes fn once for every point in the next window. -// The previous value for the dimension is passed to fn. -func (itr *integerReduceSliceFloatIterator) reduce() []FloatPoint { - // Calculate next window. - startTime, endTime := itr.opt.Window(itr.input.peekTime()) - - var reduceOptions = reduceOptions{ - startTime: startTime, - endTime: endTime, - } - - // Group points by name and tagset. - groups := make(map[string]struct { - name string - tags Tags - points []IntegerPoint - }) - for { - // Read next point. - p := itr.input.NextInWindow(startTime, endTime) - if p == nil { - break - } - tags := p.Tags.Subset(itr.opt.Dimensions) - - // Append point to dimension. - id := tags.ID() - g := groups[id] - g.name = p.Name - g.tags = tags - g.points = append(g.points, *p) - groups[id] = g - } - - // Reduce each set into a set of values. - results := make(map[string][]FloatPoint) - for key, g := range groups { - a := itr.fn(g.points, &reduceOptions) - if len(a) == 0 { - continue - } - - // Update name and tags for each returned point. - for i := range a { - a[i].Name = g.name - a[i].Tags = g.tags - } - results[key] = a - } - - // Reverse sort points by name & tag. - keys := make([]string, 0, len(results)) - for k := range results { - keys = append(keys, k) - } - sort.Sort(sort.Reverse(sort.StringSlice(keys))) - - // Reverse order points within each key. - a := make([]FloatPoint, 0, len(results)) - for _, k := range keys { - for i := len(results[k]) - 1; i >= 0; i-- { - a = append(a, results[k][i]) - } - } - - return a -} - -// integerReduceSliceFloatFunc is the function called by a IntegerPoint slice reducer that emits FloatPoint. -type integerReduceSliceFloatFunc func(a []IntegerPoint, opt *reduceOptions) []FloatPoint