influxdb/influxql/iterator.gen.go

2676 lines
67 KiB
Go
Raw Normal View History

2015-11-04 21:06:06 +00:00
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
package influxql
import (
2015-12-24 04:42:10 +00:00
"container/heap"
2015-11-04 21:06:06 +00:00
"fmt"
2015-11-04 21:06:06 +00:00
"sort"
"sync"
)
// FloatIterator represents a stream of float points.
type FloatIterator interface {
Iterator
Next() *FloatPoint
}
// newFloatIterators converts a slice of Iterator to a slice of FloatIterator.
// Panic if any iterator in itrs is not a FloatIterator.
func newFloatIterators(itrs []Iterator) []FloatIterator {
a := make([]FloatIterator, len(itrs))
for i, itr := range itrs {
a[i] = itr.(FloatIterator)
}
return a
}
// bufFloatIterator represents a buffered FloatIterator.
type bufFloatIterator struct {
itr FloatIterator
buf *FloatPoint
}
// newBufFloatIterator returns a buffered FloatIterator.
func newBufFloatIterator(itr FloatIterator) *bufFloatIterator {
return &bufFloatIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufFloatIterator) Close() error { return itr.itr.Close() }
2015-11-04 21:06:06 +00:00
// peek returns the next point without removing it from the iterator.
func (itr *bufFloatIterator) peek() *FloatPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
2015-11-04 21:06:06 +00:00
// Returns zero time if no more points available.
2015-11-04 21:06:06 +00:00
func (itr *bufFloatIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
2015-11-04 21:06:06 +00:00
return ZeroTime
}
2015-11-04 21:06:06 +00:00
return p.Time
2015-11-04 21:06:06 +00:00
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufFloatIterator) Next() *FloatPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufFloatIterator) NextInWindow(startTime, endTime int64) *FloatPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufFloatIterator) unread(v *FloatPoint) { itr.buf = v }
2015-11-04 21:06:06 +00:00
// floatMergeIterator represents an iterator that combines multiple float iterators.
type floatMergeIterator struct {
inputs []FloatIterator
heap *floatMergeHeap
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// Current iterator and window.
curr *floatMergeHeapItem
window struct {
startTime int64
endTime int64
2015-11-04 21:06:06 +00:00
}
}
2015-11-04 21:06:06 +00:00
// newFloatMergeIterator returns a new instance of floatMergeIterator.
func newFloatMergeIterator(inputs []FloatIterator, opt IteratorOptions) *floatMergeIterator {
itr := &floatMergeIterator{
inputs: inputs,
heap: &floatMergeHeap{
items: make([]*floatMergeHeapItem, 0, len(inputs)),
opt: opt,
},
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufFloatIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &floatMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Close closes the underlying iterators.
func (itr *floatMergeIterator) Close() error {
for _, input := range itr.inputs {
2016-01-22 23:46:56 +00:00
input.Close()
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return nil
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Next returns the next point from the iterator.
func (itr *floatMergeIterator) Next() *FloatPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*floatMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
2015-11-04 21:06:06 +00:00
if p == nil {
2015-11-04 21:06:06 +00:00
itr.curr = nil
2015-11-04 21:06:06 +00:00
continue
}
2015-11-04 21:06:06 +00:00
// If it's outside our window then push iterator back on the heap and find new iterator.
if (itr.heap.opt.Ascending && p.Time >= itr.window.endTime) || (!itr.heap.opt.Ascending && p.Time < itr.window.startTime) {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return p
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// floatMergeHeap represents a heap of floatMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type floatMergeHeap struct {
opt IteratorOptions
items []*floatMergeHeapItem
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h floatMergeHeap) Len() int { return len(h.items) }
func (h floatMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h floatMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
if h.opt.Ascending {
2016-01-22 23:46:56 +00:00
return xt < yt
2015-11-04 21:06:06 +00:00
}
2016-01-22 23:46:56 +00:00
return xt > yt
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *floatMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*floatMergeHeapItem))
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *floatMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
type floatMergeHeapItem struct {
itr *bufFloatIterator
}
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// floatSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type floatSortedMergeIterator struct {
inputs []FloatIterator
opt IteratorOptions
heap floatSortedMergeHeap
2015-11-04 21:06:06 +00:00
}
2015-12-24 04:42:10 +00:00
// newFloatSortedMergeIterator returns an instance of floatSortedMergeIterator.
func newFloatSortedMergeIterator(inputs []FloatIterator, opt IteratorOptions) Iterator {
itr := &floatSortedMergeIterator{
2015-11-04 21:06:06 +00:00
inputs: inputs,
heap: make(floatSortedMergeHeap, 0, len(inputs)),
2015-12-24 04:42:10 +00:00
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
2015-11-04 21:06:06 +00:00
itr.heap = append(itr.heap, &floatSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
2015-12-24 04:42:10 +00:00
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
2015-11-04 21:06:06 +00:00
func (itr *floatSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
2015-12-24 04:42:10 +00:00
// Next returns the next points from the iterator.
func (itr *floatSortedMergeIterator) Next() *FloatPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *floatSortedMergeIterator) pop() *FloatPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
2015-11-04 21:06:06 +00:00
item := heap.Pop(&itr.heap).(*floatSortedMergeHeapItem)
2015-12-24 04:42:10 +00:00
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
2015-11-04 21:06:06 +00:00
// floatSortedMergeHeap represents a heap of floatSortedMergeHeapItems.
type floatSortedMergeHeap []*floatSortedMergeHeapItem
2015-12-24 04:42:10 +00:00
2015-11-04 21:06:06 +00:00
func (h floatSortedMergeHeap) Len() int { return len(h) }
func (h floatSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h floatSortedMergeHeap) Less(i, j int) bool {
2015-12-24 04:42:10 +00:00
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
2015-11-04 21:06:06 +00:00
func (h *floatSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*floatSortedMergeHeapItem))
2015-12-24 04:42:10 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *floatSortedMergeHeap) Pop() interface{} {
2015-12-24 04:42:10 +00:00
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
2015-11-04 21:06:06 +00:00
type floatSortedMergeHeapItem struct {
2015-12-24 04:42:10 +00:00
point *FloatPoint
itr FloatIterator
ascending bool
}
2015-12-24 18:46:31 +00:00
// floatLimitIterator represents an iterator that limits points per group.
type floatLimitIterator struct {
input FloatIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newFloatLimitIterator returns a new instance of floatLimitIterator.
func newFloatLimitIterator(input FloatIterator, opt IteratorOptions) *floatLimitIterator {
return &floatLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *floatLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *floatLimitIterator) Next() *FloatPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
continue
}
return p
}
}
2015-11-04 21:06:06 +00:00
// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
output chan *FloatPoint
fields auxIteratorFields
}
func newFloatAuxIterator(input FloatIterator, opt IteratorOptions) *floatAuxIterator {
itr := &floatAuxIterator{
input: newBufFloatIterator(input),
output: make(chan *FloatPoint, 1),
fields: newAuxIteratorFields(opt),
}
// Initialize auxilary fields.
if p := itr.input.Next(); p != nil {
itr.output <- p
itr.fields.init(p)
}
go itr.stream()
return itr
}
func (itr *floatAuxIterator) Close() error { return itr.input.Close() }
func (itr *floatAuxIterator) Next() *FloatPoint { return <-itr.output }
func (itr *floatAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
2015-11-04 21:06:06 +00:00
func (itr *floatAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *floatAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
panic("not implemented")
}
2015-11-04 21:06:06 +00:00
func (itr *floatAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// floatChanIterator represents a new instance of floatChanIterator.
type floatChanIterator struct {
c chan *FloatPoint
once sync.Once
}
func (itr *floatChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *floatChanIterator) Next() *FloatPoint { return <-itr.c }
// floatReduceIterator executes a reducer for every interval and buffers the result.
type floatReduceIterator struct {
input *bufFloatIterator
fn floatReduceFunc
opt IteratorOptions
points []*FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *floatReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatReduceIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *floatReduceIterator) reduce() []*FloatPoint {
// Calculate next window.
2015-11-04 21:06:06 +00:00
startTime, endTime := itr.opt.Window(itr.input.peekTime())
2015-11-04 21:06:06 +00:00
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*FloatPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
// Pass previous and current points to reducer.
prev := m[tags.ID()]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &FloatPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*FloatPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
2015-11-04 21:06:06 +00:00
return a
}
// floatReduceFunc is the function called by a FloatPoint reducer.
type floatReduceFunc func(prev, curr *FloatPoint, opt *reduceOptions) (t int64, v float64, aux []interface{})
// floatReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type floatReduceSliceIterator struct {
input *bufFloatIterator
fn floatReduceSliceFunc
opt IteratorOptions
points []FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *floatReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatReduceSliceIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *floatReduceSliceIterator) reduce() []FloatPoint {
// Calculate next window.
2015-11-04 21:06:06 +00:00
startTime, endTime := itr.opt.Window(itr.input.peekTime())
2015-11-04 21:06:06 +00:00
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []FloatPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]FloatPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]FloatPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// floatReduceSliceFunc is the function called by a FloatPoint slice reducer.
type floatReduceSliceFunc func(a []FloatPoint, opt *reduceOptions) []FloatPoint
2015-11-04 21:06:06 +00:00
// floatReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type floatTransformIterator struct {
input FloatIterator
fn floatTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *floatTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatTransformIterator) Next() *FloatPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// floatTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type floatTransformFunc func(p *FloatPoint) *FloatPoint
// floatReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type floatBoolTransformIterator struct {
input FloatIterator
fn floatBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *floatBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// floatBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type floatBoolTransformFunc func(p *FloatPoint) *BooleanPoint
2016-01-18 22:48:49 +00:00
// IntegerIterator represents a stream of integer points.
type IntegerIterator interface {
Iterator
Next() *IntegerPoint
}
// newIntegerIterators converts a slice of Iterator to a slice of IntegerIterator.
// Panic if any iterator in itrs is not a IntegerIterator.
func newIntegerIterators(itrs []Iterator) []IntegerIterator {
a := make([]IntegerIterator, len(itrs))
for i, itr := range itrs {
a[i] = itr.(IntegerIterator)
}
return a
}
// bufIntegerIterator represents a buffered IntegerIterator.
type bufIntegerIterator struct {
itr IntegerIterator
buf *IntegerPoint
}
// newBufIntegerIterator returns a buffered IntegerIterator.
func newBufIntegerIterator(itr IntegerIterator) *bufIntegerIterator {
return &bufIntegerIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufIntegerIterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *bufIntegerIterator) peek() *IntegerPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *bufIntegerIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
return ZeroTime
}
return p.Time
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufIntegerIterator) Next() *IntegerPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufIntegerIterator) NextInWindow(startTime, endTime int64) *IntegerPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufIntegerIterator) unread(v *IntegerPoint) { itr.buf = v }
// integerMergeIterator represents an iterator that combines multiple integer iterators.
type integerMergeIterator struct {
inputs []IntegerIterator
heap *integerMergeHeap
// Current iterator and window.
curr *integerMergeHeapItem
window struct {
startTime int64
endTime int64
}
}
// newIntegerMergeIterator returns a new instance of integerMergeIterator.
func newIntegerMergeIterator(inputs []IntegerIterator, opt IteratorOptions) *integerMergeIterator {
itr := &integerMergeIterator{
inputs: inputs,
heap: &integerMergeHeap{
items: make([]*integerMergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufIntegerIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &integerMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *integerMergeIterator) Close() error {
for _, input := range itr.inputs {
2016-01-22 23:46:56 +00:00
input.Close()
2016-01-18 22:48:49 +00:00
}
return nil
}
// Next returns the next point from the iterator.
func (itr *integerMergeIterator) Next() *IntegerPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*integerMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
if p == nil {
itr.curr = nil
continue
}
// If it's outside our window then push iterator back on the heap and find new iterator.
if (itr.heap.opt.Ascending && p.Time >= itr.window.endTime) || (!itr.heap.opt.Ascending && p.Time < itr.window.startTime) {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
}
return p
}
}
// integerMergeHeap represents a heap of integerMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type integerMergeHeap struct {
opt IteratorOptions
items []*integerMergeHeapItem
}
func (h integerMergeHeap) Len() int { return len(h.items) }
func (h integerMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h integerMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
2016-01-22 23:46:56 +00:00
return xt < yt
2016-01-18 22:48:49 +00:00
}
2016-01-22 23:46:56 +00:00
return xt > yt
2016-01-18 22:48:49 +00:00
}
func (h *integerMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*integerMergeHeapItem))
}
func (h *integerMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
}
type integerMergeHeapItem struct {
itr *bufIntegerIterator
}
// integerSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type integerSortedMergeIterator struct {
inputs []IntegerIterator
opt IteratorOptions
heap integerSortedMergeHeap
}
// newIntegerSortedMergeIterator returns an instance of integerSortedMergeIterator.
func newIntegerSortedMergeIterator(inputs []IntegerIterator, opt IteratorOptions) Iterator {
itr := &integerSortedMergeIterator{
inputs: inputs,
heap: make(integerSortedMergeHeap, 0, len(inputs)),
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
itr.heap = append(itr.heap, &integerSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *integerSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next points from the iterator.
func (itr *integerSortedMergeIterator) Next() *IntegerPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *integerSortedMergeIterator) pop() *IntegerPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*integerSortedMergeHeapItem)
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
// integerSortedMergeHeap represents a heap of integerSortedMergeHeapItems.
type integerSortedMergeHeap []*integerSortedMergeHeapItem
func (h integerSortedMergeHeap) Len() int { return len(h) }
func (h integerSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h integerSortedMergeHeap) Less(i, j int) bool {
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
func (h *integerSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*integerSortedMergeHeapItem))
}
func (h *integerSortedMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
type integerSortedMergeHeapItem struct {
point *IntegerPoint
itr IntegerIterator
ascending bool
}
// integerLimitIterator represents an iterator that limits points per group.
type integerLimitIterator struct {
input IntegerIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newIntegerLimitIterator returns a new instance of integerLimitIterator.
func newIntegerLimitIterator(input IntegerIterator, opt IteratorOptions) *integerLimitIterator {
return &integerLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *integerLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *integerLimitIterator) Next() *IntegerPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
continue
}
return p
}
}
// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
output chan *IntegerPoint
fields auxIteratorFields
}
func newIntegerAuxIterator(input IntegerIterator, opt IteratorOptions) *integerAuxIterator {
itr := &integerAuxIterator{
input: newBufIntegerIterator(input),
output: make(chan *IntegerPoint, 1),
fields: newAuxIteratorFields(opt),
}
// Initialize auxilary fields.
if p := itr.input.Next(); p != nil {
itr.output <- p
itr.fields.init(p)
}
go itr.stream()
return itr
}
func (itr *integerAuxIterator) Close() error { return itr.input.Close() }
func (itr *integerAuxIterator) Next() *IntegerPoint { return <-itr.output }
func (itr *integerAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *integerAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *integerAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
panic("not implemented")
}
func (itr *integerAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// integerChanIterator represents a new instance of integerChanIterator.
type integerChanIterator struct {
c chan *IntegerPoint
once sync.Once
}
func (itr *integerChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *integerChanIterator) Next() *IntegerPoint { return <-itr.c }
// integerReduceIterator executes a reducer for every interval and buffers the result.
type integerReduceIterator struct {
input *bufIntegerIterator
fn integerReduceFunc
opt IteratorOptions
points []*IntegerPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceIterator) Next() *IntegerPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceIterator) reduce() []*IntegerPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*IntegerPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
// Pass previous and current points to reducer.
prev := m[tags.ID()]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &IntegerPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*IntegerPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
2016-01-18 22:48:49 +00:00
return a
}
// integerReduceFunc is the function called by a IntegerPoint reducer.
type integerReduceFunc func(prev, curr *IntegerPoint, opt *reduceOptions) (t int64, v int64, aux []interface{})
// integerReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type integerReduceSliceIterator struct {
input *bufIntegerIterator
fn integerReduceSliceFunc
opt IteratorOptions
points []IntegerPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceSliceIterator) Next() *IntegerPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceSliceIterator) reduce() []IntegerPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []IntegerPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]IntegerPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]IntegerPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// integerReduceSliceFunc is the function called by a IntegerPoint slice reducer.
type integerReduceSliceFunc func(a []IntegerPoint, opt *reduceOptions) []IntegerPoint
// integerReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type integerTransformIterator struct {
input IntegerIterator
fn integerTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *integerTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerTransformIterator) Next() *IntegerPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// integerTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type integerTransformFunc func(p *IntegerPoint) *IntegerPoint
// integerReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type integerBoolTransformIterator struct {
input IntegerIterator
fn integerBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *integerBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// integerBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type integerBoolTransformFunc func(p *IntegerPoint) *BooleanPoint
2015-11-04 21:06:06 +00:00
// StringIterator represents a stream of string points.
type StringIterator interface {
Iterator
Next() *StringPoint
}
// newStringIterators converts a slice of Iterator to a slice of StringIterator.
// Panic if any iterator in itrs is not a StringIterator.
func newStringIterators(itrs []Iterator) []StringIterator {
a := make([]StringIterator, len(itrs))
for i, itr := range itrs {
a[i] = itr.(StringIterator)
}
return a
}
// bufStringIterator represents a buffered StringIterator.
type bufStringIterator struct {
itr StringIterator
buf *StringPoint
}
// newBufStringIterator returns a buffered StringIterator.
func newBufStringIterator(itr StringIterator) *bufStringIterator {
return &bufStringIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufStringIterator) Close() error { return itr.itr.Close() }
2015-11-04 21:06:06 +00:00
// peek returns the next point without removing it from the iterator.
func (itr *bufStringIterator) peek() *StringPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
2015-11-04 21:06:06 +00:00
// Returns zero time if no more points available.
2015-11-04 21:06:06 +00:00
func (itr *bufStringIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
2015-11-04 21:06:06 +00:00
return ZeroTime
}
2015-11-04 21:06:06 +00:00
return p.Time
2015-11-04 21:06:06 +00:00
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufStringIterator) Next() *StringPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufStringIterator) NextInWindow(startTime, endTime int64) *StringPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufStringIterator) unread(v *StringPoint) { itr.buf = v }
2015-11-04 21:06:06 +00:00
// stringMergeIterator represents an iterator that combines multiple string iterators.
type stringMergeIterator struct {
inputs []StringIterator
heap *stringMergeHeap
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// Current iterator and window.
curr *stringMergeHeapItem
window struct {
startTime int64
endTime int64
2015-11-04 21:06:06 +00:00
}
}
2015-11-04 21:06:06 +00:00
// newStringMergeIterator returns a new instance of stringMergeIterator.
func newStringMergeIterator(inputs []StringIterator, opt IteratorOptions) *stringMergeIterator {
itr := &stringMergeIterator{
inputs: inputs,
heap: &stringMergeHeap{
items: make([]*stringMergeHeapItem, 0, len(inputs)),
opt: opt,
},
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufStringIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &stringMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Close closes the underlying iterators.
func (itr *stringMergeIterator) Close() error {
for _, input := range itr.inputs {
2016-01-22 23:46:56 +00:00
input.Close()
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return nil
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Next returns the next point from the iterator.
func (itr *stringMergeIterator) Next() *StringPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*stringMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
2015-11-04 21:06:06 +00:00
if p == nil {
2015-11-04 21:06:06 +00:00
itr.curr = nil
2015-11-04 21:06:06 +00:00
continue
}
2015-11-04 21:06:06 +00:00
// If it's outside our window then push iterator back on the heap and find new iterator.
if (itr.heap.opt.Ascending && p.Time >= itr.window.endTime) || (!itr.heap.opt.Ascending && p.Time < itr.window.startTime) {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return p
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// stringMergeHeap represents a heap of stringMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type stringMergeHeap struct {
opt IteratorOptions
items []*stringMergeHeapItem
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h stringMergeHeap) Len() int { return len(h.items) }
func (h stringMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h stringMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
if h.opt.Ascending {
2016-01-22 23:46:56 +00:00
return xt < yt
2015-11-04 21:06:06 +00:00
}
2016-01-22 23:46:56 +00:00
return xt > yt
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *stringMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*stringMergeHeapItem))
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *stringMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
type stringMergeHeapItem struct {
itr *bufStringIterator
}
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// stringSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type stringSortedMergeIterator struct {
inputs []StringIterator
opt IteratorOptions
heap stringSortedMergeHeap
2015-11-04 21:06:06 +00:00
}
2015-12-24 04:42:10 +00:00
// newStringSortedMergeIterator returns an instance of stringSortedMergeIterator.
func newStringSortedMergeIterator(inputs []StringIterator, opt IteratorOptions) Iterator {
itr := &stringSortedMergeIterator{
2015-11-04 21:06:06 +00:00
inputs: inputs,
heap: make(stringSortedMergeHeap, 0, len(inputs)),
2015-12-24 04:42:10 +00:00
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
2015-11-04 21:06:06 +00:00
itr.heap = append(itr.heap, &stringSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
2015-12-24 04:42:10 +00:00
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
2015-11-04 21:06:06 +00:00
func (itr *stringSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
2015-12-24 04:42:10 +00:00
// Next returns the next points from the iterator.
func (itr *stringSortedMergeIterator) Next() *StringPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *stringSortedMergeIterator) pop() *StringPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
2015-11-04 21:06:06 +00:00
item := heap.Pop(&itr.heap).(*stringSortedMergeHeapItem)
2015-12-24 04:42:10 +00:00
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
2015-11-04 21:06:06 +00:00
// stringSortedMergeHeap represents a heap of stringSortedMergeHeapItems.
type stringSortedMergeHeap []*stringSortedMergeHeapItem
2015-12-24 04:42:10 +00:00
2015-11-04 21:06:06 +00:00
func (h stringSortedMergeHeap) Len() int { return len(h) }
func (h stringSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h stringSortedMergeHeap) Less(i, j int) bool {
2015-12-24 04:42:10 +00:00
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
2015-11-04 21:06:06 +00:00
func (h *stringSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*stringSortedMergeHeapItem))
2015-12-24 04:42:10 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *stringSortedMergeHeap) Pop() interface{} {
2015-12-24 04:42:10 +00:00
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
2015-11-04 21:06:06 +00:00
type stringSortedMergeHeapItem struct {
2015-12-24 04:42:10 +00:00
point *StringPoint
itr StringIterator
ascending bool
}
2015-12-24 18:46:31 +00:00
// stringLimitIterator represents an iterator that limits points per group.
type stringLimitIterator struct {
input StringIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newStringLimitIterator returns a new instance of stringLimitIterator.
func newStringLimitIterator(input StringIterator, opt IteratorOptions) *stringLimitIterator {
return &stringLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *stringLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *stringLimitIterator) Next() *StringPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
continue
}
return p
}
}
2015-11-04 21:06:06 +00:00
// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
output chan *StringPoint
fields auxIteratorFields
}
func newStringAuxIterator(input StringIterator, opt IteratorOptions) *stringAuxIterator {
itr := &stringAuxIterator{
input: newBufStringIterator(input),
output: make(chan *StringPoint, 1),
fields: newAuxIteratorFields(opt),
}
// Initialize auxilary fields.
if p := itr.input.Next(); p != nil {
itr.output <- p
itr.fields.init(p)
}
go itr.stream()
return itr
}
func (itr *stringAuxIterator) Close() error { return itr.input.Close() }
func (itr *stringAuxIterator) Next() *StringPoint { return <-itr.output }
func (itr *stringAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
2015-11-04 21:06:06 +00:00
func (itr *stringAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *stringAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
panic("not implemented")
}
2015-11-04 21:06:06 +00:00
func (itr *stringAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// stringChanIterator represents a new instance of stringChanIterator.
type stringChanIterator struct {
c chan *StringPoint
once sync.Once
}
func (itr *stringChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *stringChanIterator) Next() *StringPoint { return <-itr.c }
// stringReduceIterator executes a reducer for every interval and buffers the result.
type stringReduceIterator struct {
input *bufStringIterator
fn stringReduceFunc
opt IteratorOptions
points []*StringPoint
}
// Close closes the iterator and all child iterators.
func (itr *stringReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringReduceIterator) Next() *StringPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *stringReduceIterator) reduce() []*StringPoint {
// Calculate next window.
2015-11-04 21:06:06 +00:00
startTime, endTime := itr.opt.Window(itr.input.peekTime())
2015-11-04 21:06:06 +00:00
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*StringPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
// Pass previous and current points to reducer.
prev := m[tags.ID()]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &StringPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*StringPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
2015-11-04 21:06:06 +00:00
return a
}
// stringReduceFunc is the function called by a StringPoint reducer.
type stringReduceFunc func(prev, curr *StringPoint, opt *reduceOptions) (t int64, v string, aux []interface{})
// stringReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type stringReduceSliceIterator struct {
input *bufStringIterator
fn stringReduceSliceFunc
opt IteratorOptions
points []StringPoint
}
// Close closes the iterator and all child iterators.
func (itr *stringReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringReduceSliceIterator) Next() *StringPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *stringReduceSliceIterator) reduce() []StringPoint {
// Calculate next window.
2015-11-04 21:06:06 +00:00
startTime, endTime := itr.opt.Window(itr.input.peekTime())
2015-11-04 21:06:06 +00:00
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []StringPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]StringPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]StringPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// stringReduceSliceFunc is the function called by a StringPoint slice reducer.
type stringReduceSliceFunc func(a []StringPoint, opt *reduceOptions) []StringPoint
2015-11-04 21:06:06 +00:00
// stringReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type stringTransformIterator struct {
input StringIterator
fn stringTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *stringTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringTransformIterator) Next() *StringPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// stringTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type stringTransformFunc func(p *StringPoint) *StringPoint
// stringReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type stringBoolTransformIterator struct {
input StringIterator
fn stringBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *stringBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// stringBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type stringBoolTransformFunc func(p *StringPoint) *BooleanPoint
2015-11-04 21:06:06 +00:00
// BooleanIterator represents a stream of boolean points.
type BooleanIterator interface {
Iterator
Next() *BooleanPoint
}
// newBooleanIterators converts a slice of Iterator to a slice of BooleanIterator.
// Panic if any iterator in itrs is not a BooleanIterator.
func newBooleanIterators(itrs []Iterator) []BooleanIterator {
a := make([]BooleanIterator, len(itrs))
for i, itr := range itrs {
a[i] = itr.(BooleanIterator)
}
return a
}
// bufBooleanIterator represents a buffered BooleanIterator.
type bufBooleanIterator struct {
itr BooleanIterator
buf *BooleanPoint
}
// newBufBooleanIterator returns a buffered BooleanIterator.
func newBufBooleanIterator(itr BooleanIterator) *bufBooleanIterator {
return &bufBooleanIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufBooleanIterator) Close() error { return itr.itr.Close() }
2015-11-04 21:06:06 +00:00
// peek returns the next point without removing it from the iterator.
func (itr *bufBooleanIterator) peek() *BooleanPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
2015-11-04 21:06:06 +00:00
// Returns zero time if no more points available.
2015-11-04 21:06:06 +00:00
func (itr *bufBooleanIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
2015-11-04 21:06:06 +00:00
return ZeroTime
}
2015-11-04 21:06:06 +00:00
return p.Time
2015-11-04 21:06:06 +00:00
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufBooleanIterator) Next() *BooleanPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufBooleanIterator) NextInWindow(startTime, endTime int64) *BooleanPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufBooleanIterator) unread(v *BooleanPoint) { itr.buf = v }
2015-11-04 21:06:06 +00:00
// booleanMergeIterator represents an iterator that combines multiple boolean iterators.
type booleanMergeIterator struct {
inputs []BooleanIterator
heap *booleanMergeHeap
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// Current iterator and window.
curr *booleanMergeHeapItem
window struct {
startTime int64
endTime int64
2015-11-04 21:06:06 +00:00
}
}
2015-11-04 21:06:06 +00:00
// newBooleanMergeIterator returns a new instance of booleanMergeIterator.
func newBooleanMergeIterator(inputs []BooleanIterator, opt IteratorOptions) *booleanMergeIterator {
itr := &booleanMergeIterator{
inputs: inputs,
heap: &booleanMergeHeap{
items: make([]*booleanMergeHeapItem, 0, len(inputs)),
opt: opt,
},
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufBooleanIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &booleanMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Close closes the underlying iterators.
func (itr *booleanMergeIterator) Close() error {
for _, input := range itr.inputs {
2016-01-22 23:46:56 +00:00
input.Close()
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return nil
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Next returns the next point from the iterator.
func (itr *booleanMergeIterator) Next() *BooleanPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*booleanMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
2015-11-04 21:06:06 +00:00
if p == nil {
2015-11-04 21:06:06 +00:00
itr.curr = nil
2015-11-04 21:06:06 +00:00
continue
}
2015-11-04 21:06:06 +00:00
// If it's outside our window then push iterator back on the heap and find new iterator.
if (itr.heap.opt.Ascending && p.Time >= itr.window.endTime) || (!itr.heap.opt.Ascending && p.Time < itr.window.startTime) {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return p
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// booleanMergeHeap represents a heap of booleanMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type booleanMergeHeap struct {
opt IteratorOptions
items []*booleanMergeHeapItem
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h booleanMergeHeap) Len() int { return len(h.items) }
func (h booleanMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h booleanMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
if h.opt.Ascending {
2016-01-22 23:46:56 +00:00
return xt < yt
2015-11-04 21:06:06 +00:00
}
2016-01-22 23:46:56 +00:00
return xt > yt
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *booleanMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*booleanMergeHeapItem))
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *booleanMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
type booleanMergeHeapItem struct {
itr *bufBooleanIterator
}
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// booleanSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type booleanSortedMergeIterator struct {
inputs []BooleanIterator
opt IteratorOptions
heap booleanSortedMergeHeap
2015-11-04 21:06:06 +00:00
}
2015-12-24 04:42:10 +00:00
// newBooleanSortedMergeIterator returns an instance of booleanSortedMergeIterator.
func newBooleanSortedMergeIterator(inputs []BooleanIterator, opt IteratorOptions) Iterator {
itr := &booleanSortedMergeIterator{
2015-11-04 21:06:06 +00:00
inputs: inputs,
heap: make(booleanSortedMergeHeap, 0, len(inputs)),
2015-12-24 04:42:10 +00:00
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
2015-11-04 21:06:06 +00:00
itr.heap = append(itr.heap, &booleanSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
2015-12-24 04:42:10 +00:00
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
2015-11-04 21:06:06 +00:00
func (itr *booleanSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
2015-12-24 04:42:10 +00:00
// Next returns the next points from the iterator.
func (itr *booleanSortedMergeIterator) Next() *BooleanPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *booleanSortedMergeIterator) pop() *BooleanPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
2015-11-04 21:06:06 +00:00
item := heap.Pop(&itr.heap).(*booleanSortedMergeHeapItem)
2015-12-24 04:42:10 +00:00
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
2015-11-04 21:06:06 +00:00
// booleanSortedMergeHeap represents a heap of booleanSortedMergeHeapItems.
type booleanSortedMergeHeap []*booleanSortedMergeHeapItem
2015-12-24 04:42:10 +00:00
2015-11-04 21:06:06 +00:00
func (h booleanSortedMergeHeap) Len() int { return len(h) }
func (h booleanSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h booleanSortedMergeHeap) Less(i, j int) bool {
2015-12-24 04:42:10 +00:00
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
2015-11-04 21:06:06 +00:00
func (h *booleanSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*booleanSortedMergeHeapItem))
2015-12-24 04:42:10 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *booleanSortedMergeHeap) Pop() interface{} {
2015-12-24 04:42:10 +00:00
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
2015-11-04 21:06:06 +00:00
type booleanSortedMergeHeapItem struct {
2015-12-24 04:42:10 +00:00
point *BooleanPoint
itr BooleanIterator
ascending bool
}
2015-12-24 18:46:31 +00:00
// booleanLimitIterator represents an iterator that limits points per group.
type booleanLimitIterator struct {
input BooleanIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newBooleanLimitIterator returns a new instance of booleanLimitIterator.
func newBooleanLimitIterator(input BooleanIterator, opt IteratorOptions) *booleanLimitIterator {
return &booleanLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *booleanLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *booleanLimitIterator) Next() *BooleanPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
continue
}
return p
}
}
2015-11-04 21:06:06 +00:00
// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator
output chan *BooleanPoint
fields auxIteratorFields
}
func newBooleanAuxIterator(input BooleanIterator, opt IteratorOptions) *booleanAuxIterator {
itr := &booleanAuxIterator{
input: newBufBooleanIterator(input),
output: make(chan *BooleanPoint, 1),
fields: newAuxIteratorFields(opt),
}
// Initialize auxilary fields.
if p := itr.input.Next(); p != nil {
itr.output <- p
itr.fields.init(p)
}
go itr.stream()
return itr
}
func (itr *booleanAuxIterator) Close() error { return itr.input.Close() }
func (itr *booleanAuxIterator) Next() *BooleanPoint { return <-itr.output }
func (itr *booleanAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
2015-11-04 21:06:06 +00:00
func (itr *booleanAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *booleanAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
panic("not implemented")
}
2015-11-04 21:06:06 +00:00
func (itr *booleanAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// booleanChanIterator represents a new instance of booleanChanIterator.
type booleanChanIterator struct {
c chan *BooleanPoint
once sync.Once
}
func (itr *booleanChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *booleanChanIterator) Next() *BooleanPoint { return <-itr.c }
// booleanReduceIterator executes a reducer for every interval and buffers the result.
type booleanReduceIterator struct {
input *bufBooleanIterator
fn booleanReduceFunc
opt IteratorOptions
points []*BooleanPoint
}
// Close closes the iterator and all child iterators.
func (itr *booleanReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanReduceIterator) Next() *BooleanPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *booleanReduceIterator) reduce() []*BooleanPoint {
// Calculate next window.
2015-11-04 21:06:06 +00:00
startTime, endTime := itr.opt.Window(itr.input.peekTime())
2015-11-04 21:06:06 +00:00
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*BooleanPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
// Pass previous and current points to reducer.
prev := m[tags.ID()]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &BooleanPoint{Name: curr.Name, Tags: tags}
m[tags.ID()] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*BooleanPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
2015-11-04 21:06:06 +00:00
return a
}
// booleanReduceFunc is the function called by a BooleanPoint reducer.
type booleanReduceFunc func(prev, curr *BooleanPoint, opt *reduceOptions) (t int64, v bool, aux []interface{})
// booleanReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type booleanReduceSliceIterator struct {
input *bufBooleanIterator
fn booleanReduceSliceFunc
opt IteratorOptions
points []BooleanPoint
}
// Close closes the iterator and all child iterators.
func (itr *booleanReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanReduceSliceIterator) Next() *BooleanPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *booleanReduceSliceIterator) reduce() []BooleanPoint {
// Calculate next window.
2015-11-04 21:06:06 +00:00
startTime, endTime := itr.opt.Window(itr.input.peekTime())
2015-11-04 21:06:06 +00:00
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []BooleanPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]BooleanPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]BooleanPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// booleanReduceSliceFunc is the function called by a BooleanPoint slice reducer.
type booleanReduceSliceFunc func(a []BooleanPoint, opt *reduceOptions) []BooleanPoint
2015-11-04 21:06:06 +00:00
// booleanReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type booleanTransformIterator struct {
input BooleanIterator
fn booleanTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *booleanTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// booleanTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type booleanTransformFunc func(p *BooleanPoint) *BooleanPoint
// booleanReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type booleanBoolTransformIterator struct {
input BooleanIterator
fn booleanBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *booleanBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// booleanBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type booleanBoolTransformFunc func(p *BooleanPoint) *BooleanPoint