package influxql import ( "container/heap" "errors" "encoding/binary" "fmt" "io" "sort" "sync" "log" "time" "github.com/gogo/protobuf/proto" internal "github.com/influxdata/influxdb/influxql/internal" ) // DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval. const DefaultStatsInterval = 10 * time.Second {{with $types := .}}{{range $k := $types}} // {{$k.Name}}Iterator represents a stream of {{$k.name}} points. type {{$k.Name}}Iterator interface { Iterator Next() *{{$k.Name}}Point } // new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator. // Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot // be cast to a {{$k.Name}}Iterator. func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator { a := make([]{{$k.Name}}Iterator, 0, len(itrs)) for _, itr := range itrs { switch itr := itr.(type) { case {{$k.Name}}Iterator: a = append(a, itr) {{if eq .Name "Float"}} case IntegerIterator: a = append(a, &integerFloatCastIterator{input: itr}) {{end}} default: itr.Close() } } return a } // buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator. type buf{{$k.Name}}Iterator struct { itr {{$k.Name}}Iterator buf *{{$k.Name}}Point } // newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator. func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator { return &buf{{$k.Name}}Iterator{itr: itr} } // Stats returns statistics from the input iterator. func (itr *buf{{$k.Name}}Iterator) Stats() IteratorStats { return itr.itr.Stats() } // Close closes the underlying iterator. func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() } // peek returns the next point without removing it from the iterator. func (itr *buf{{$k.Name}}Iterator) peek() *{{$k.Name}}Point { p := itr.Next() itr.unread(p) return p } // peekTime returns the time of the next point. // Returns zero time if no more points available. func (itr *buf{{$k.Name}}Iterator) peekTime() int64 { p := itr.peek() if p == nil { return ZeroTime } return p.Time } // Next returns the current buffer, if exists, or calls the underlying iterator. func (itr *buf{{$k.Name}}Iterator) Next() *{{$k.Name}}Point { if itr.buf != nil { buf := itr.buf itr.buf = nil return buf } return itr.itr.Next() } // NextInWindow returns the next value if it is between [startTime, endTime). // If the next value is outside the range then it is moved to the buffer. func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{$k.Name}}Point { v := itr.Next() if v == nil { return nil } else if v.Time < startTime || v.Time >= endTime { itr.unread(v) return nil } return v } // unread sets v to the buffer. It is read on the next call to Next(). func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v } // {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators. type {{$k.name}}MergeIterator struct { inputs []{{$k.Name}}Iterator heap *{{$k.name}}MergeHeap init bool // Current iterator and window. curr *{{$k.name}}MergeHeapItem window struct { name string tags string startTime int64 endTime int64 } } // new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator. func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator { itr := &{{$k.name}}MergeIterator{ inputs: inputs, heap: &{{$k.name}}MergeHeap{ items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)), opt: opt, }, } // Initialize heap items. for _, input := range inputs { // Wrap in buffer, ignore any inputs without anymore points. bufInput := newBuf{{$k.Name}}Iterator(input) // Append to the heap. itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput}) } return itr } // Stats returns an aggregation of stats from the underlying iterators. func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats { var stats IteratorStats for _, input := range itr.inputs { stats.Add(input.Stats()) } return stats } // Close closes the underlying iterators. func (itr *{{$k.name}}MergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } return nil } // Next returns the next point from the iterator. func (itr *{{$k.name}}MergeIterator) Next() *{{$k.Name}}Point { // Initialize the heap. This needs to be done lazily on the first call to this iterator // so that iterator initialization done through the Select() call returns quickly. // Queries can only be interrupted after the Select() call completes so any operations // done during iterator creation cannot be interrupted, which is why we do it here // instead so an interrupt can happen while initializing the heap. if !itr.init { items := itr.heap.items itr.heap.items = make([]*{{$k.name}}MergeHeapItem, 0, len(items)) for _, item := range items { if item.itr.peek() == nil { continue } itr.heap.items = append(itr.heap.items, item) } heap.Init(itr.heap) itr.init = true } for { // Retrieve the next iterator if we don't have one. if itr.curr == nil { if len(itr.heap.items) == 0 { return nil } itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem) // Read point and set current window. p := itr.curr.itr.Next() itr.window.name, itr.window.tags = p.Name, p.Tags.ID() itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time) return p } // Read the next point from the current iterator. p := itr.curr.itr.Next() // If there are no more points then remove iterator from heap and find next. if p == nil { itr.curr = nil continue } // Check if the point is inside of our current window. inWindow := true if itr.window.name != p.Name { inWindow = false } else if itr.window.tags != p.Tags.ID() { inWindow = false } else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime { inWindow = false } else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime { inWindow = false } // If it's outside our window then push iterator back on the heap and find new iterator. if !inWindow { itr.curr.itr.unread(p) heap.Push(itr.heap, itr.curr) itr.curr = nil continue } return p } } // {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems. // Items are sorted by their next window and then by name/tags. type {{$k.name}}MergeHeap struct { opt IteratorOptions items []*{{$k.name}}MergeHeapItem } func (h {{$k.name}}MergeHeap) Len() int { return len(h.items) } func (h {{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] } func (h {{$k.name}}MergeHeap) Less(i, j int) bool { x, y := h.items[i].itr.peek(), h.items[j].itr.peek() if h.opt.Ascending { if x.Name != y.Name { return x.Name < y.Name } else if x.Tags.ID() != y.Tags.ID() { return x.Tags.ID() < y.Tags.ID() } } else { if x.Name != y.Name { return x.Name > y.Name } else if x.Tags.ID() != y.Tags.ID() { return x.Tags.ID() > y.Tags.ID() } } xt, _ := h.opt.Window(x.Time) yt, _ := h.opt.Window(y.Time) if h.opt.Ascending { return xt < yt } return xt > yt } func (h *{{$k.name}}MergeHeap) Push(x interface{}) { h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem)) } func (h *{{$k.name}}MergeHeap) Pop() interface{} { old := h.items n := len(old) item := old[n-1] h.items = old[0 : n-1] return item } type {{$k.name}}MergeHeapItem struct { itr *buf{{$k.Name}}Iterator } // {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one. type {{$k.name}}SortedMergeIterator struct { inputs []{{$k.Name}}Iterator opt IteratorOptions heap {{$k.name}}SortedMergeHeap } // new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator. func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator { itr := &{{$k.name}}SortedMergeIterator{ inputs: inputs, heap: make({{$k.name}}SortedMergeHeap, 0, len(inputs)), opt: opt, } // Initialize heap. for _, input := range inputs { // Read next point. p := input.Next() if p == nil { continue } // Append to the heap. itr.heap = append(itr.heap, &{{$k.name}}SortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending}) } heap.Init(&itr.heap) return itr } // Stats returns an aggregation of stats from the underlying iterators. func (itr *{{$k.name}}SortedMergeIterator) Stats() IteratorStats { var stats IteratorStats for _, input := range itr.inputs { stats.Add(input.Stats()) } return stats } // Close closes the underlying iterators. func (itr *{{$k.name}}SortedMergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } return nil } // Next returns the next points from the iterator. func (itr *{{$k.name}}SortedMergeIterator) Next() *{{$k.Name}}Point { return itr.pop() } // pop returns the next point from the heap. // Reads the next point from item's cursor and puts it back on the heap. func (itr *{{$k.name}}SortedMergeIterator) pop() *{{$k.Name}}Point { if len(itr.heap) == 0 { return nil } // Read the next item from the heap. item := heap.Pop(&itr.heap).(*{{$k.name}}SortedMergeHeapItem) // Copy the point for return. p := item.point.Clone() // Read the next item from the cursor. Push back to heap if one exists. if item.point = item.itr.Next(); item.point != nil { heap.Push(&itr.heap, item) } return p } // {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems. type {{$k.name}}SortedMergeHeap []*{{$k.name}}SortedMergeHeapItem func (h {{$k.name}}SortedMergeHeap) Len() int { return len(h) } func (h {{$k.name}}SortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h {{$k.name}}SortedMergeHeap) Less(i, j int) bool { x, y := h[i].point, h[j].point if h[i].ascending { if x.Name != y.Name { return x.Name < y.Name } else if !x.Tags.Equals(&y.Tags) { return x.Tags.ID() < y.Tags.ID() } return x.Time < y.Time } if x.Name != y.Name { return x.Name > y.Name } else if !x.Tags.Equals(&y.Tags) { return x.Tags.ID() > y.Tags.ID() } return x.Time > y.Time } func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) { *h = append(*h, x.(*{{$k.name}}SortedMergeHeapItem)) } func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} { old := *h n := len(old) item := old[n-1] *h = old[0 : n-1] return item } type {{$k.name}}SortedMergeHeapItem struct { point *{{$k.Name}}Point itr {{$k.Name}}Iterator ascending bool } // {{$k.name}}LimitIterator represents an iterator that limits points per group. type {{$k.name}}LimitIterator struct { input {{$k.Name}}Iterator opt IteratorOptions n int prev struct { name string tags Tags } } // new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator. func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator { return &{{$k.name}}LimitIterator{ input: input, opt: opt, } } // Stats returns stats from the underlying iterator. func (itr *{{$k.name}}LimitIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the underlying iterators. func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() } // Next returns the next point from the iterator. func (itr *{{$k.name}}LimitIterator) Next() *{{$k.Name}}Point { for { p := itr.input.Next() if p == nil { return nil } // Reset window and counter if a new window is encountered. if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) { itr.prev.name = p.Name itr.prev.tags = p.Tags itr.n = 0 } // Increment counter. itr.n++ // Read next point if not beyond the offset. if itr.n <= itr.opt.Offset { continue } // Read next point if we're beyond the limit. if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit { // If there's no interval, no groups, and a single source then simply exit. if itr.opt.Interval.IsZero() && len(itr.opt.Dimensions) == 0 && len(itr.opt.Sources) == 1 { return nil } continue } return p } } type {{$k.name}}FillIterator struct { input *buf{{$k.Name}}Iterator prev *{{$k.Name}}Point startTime int64 endTime int64 auxFields []interface{} done bool opt IteratorOptions window struct { name string tags Tags time int64 } } func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt IteratorOptions) *{{$k.name}}FillIterator { if opt.Fill == NullFill { if expr, ok := expr.(*Call); ok && expr.Name == "count" { opt.Fill = NumberFill opt.FillValue = {{$k.Zero}} } } var startTime, endTime int64 if opt.Ascending { startTime, _ = opt.Window(opt.StartTime) _, endTime = opt.Window(opt.EndTime) } else { _, startTime = opt.Window(opt.EndTime) endTime, _ = opt.Window(opt.StartTime) } var auxFields []interface{} if len(opt.Aux) > 0 { auxFields = make([]interface{}, len(opt.Aux)) } itr := &{{$k.name}}FillIterator{ input: newBuf{{$k.Name}}Iterator(input), startTime: startTime, endTime: endTime, auxFields: auxFields, opt: opt, } p := itr.input.peek() if p != nil { itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime } else { itr.window.time = itr.endTime } return itr } func (itr *{{$k.name}}FillIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}FillIterator) Next() *{{$k.Name}}Point { p := itr.input.Next() // Check if the next point is outside of our window or is nil. for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() { // If we are inside of an interval, unread the point and continue below to // constructing a new point. if itr.opt.Ascending { if itr.window.time < itr.endTime { itr.input.unread(p) p = nil break } } else { if itr.window.time >= itr.endTime { itr.input.unread(p) p = nil break } } // We are *not* in a current interval. If there is no next point, // we are at the end of all intervals. if p == nil { return nil } // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime itr.prev = nil break } // Check if the point is our next expected point. if p == nil || p.Time > itr.window.time { if p != nil { itr.input.unread(p) } p = &{{$k.Name}}Point{ Name: itr.window.name, Tags: itr.window.tags, Time: itr.window.time, Aux: itr.auxFields, } switch itr.opt.Fill { case NullFill: p.Nil = true case NumberFill: p.Value = castTo{{$k.Name}}(itr.opt.FillValue) case PreviousFill: if itr.prev != nil { p.Value = itr.prev.Value p.Nil = itr.prev.Nil } else { p.Nil = true } } } else { itr.prev = p } // Advance the expected time. Do not advance to a new window here // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { itr.window.time = p.Time + int64(itr.opt.Interval.Duration) } else { itr.window.time = p.Time - int64(itr.opt.Interval.Duration) } return p } // {{$k.name}}IntervalIterator represents a {{$k.name}} implementation of IntervalIterator. type {{$k.name}}IntervalIterator struct { input {{$k.Name}}Iterator opt IteratorOptions } func new{{$k.Name}}IntervalIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}IntervalIterator { return &{{$k.name}}IntervalIterator{input: input, opt: opt} } func (itr *{{$k.name}}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}IntervalIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point { p := itr.input.Next() if p == nil { return p } p.Time, _ = itr.opt.Window(p.Time) return p } // {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator. type {{$k.name}}InterruptIterator struct { input {{$k.Name}}Iterator closing <-chan struct{} count int } func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator { return &{{$k.name}}InterruptIterator{input: input, closing: closing} } func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}InterruptIterator) Next() *{{$k.Name}}Point { // Only check if the channel is closed every 256 points. This // intentionally checks on both 0 and 256 so that if the iterator // has been interrupted before the first point is emitted it will // not emit any points. if itr.count&0x100 == 0 { select { case <-itr.closing: return nil default: // Reset iterator count to zero and fall through to emit the next point. itr.count = 0 } } // Increment the counter for every point read. itr.count++ return itr.input.Next() } // {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator. type {{$k.name}}AuxIterator struct { input *buf{{$k.Name}}Iterator output chan *{{$k.Name}}Point fields auxIteratorFields background bool } func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator { return &{{$k.name}}AuxIterator{ input: newBuf{{$k.Name}}Iterator(input), output: make(chan *{{$k.Name}}Point, 1), fields: newAuxIteratorFields(seriesKeys, opt), } } func (itr *{{$k.name}}AuxIterator) Background() { itr.background = true itr.Start() go DrainIterator(itr) } func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() } func (itr *{{$k.name}}AuxIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output } func (itr *{{$k.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) } func (itr *{{$k.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) { expr := opt.Expr if expr == nil { panic("unable to create an iterator with no expression from an aux iterator") } switch expr := expr.(type) { case *VarRef: return itr.Iterator(expr.Val), nil default: panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr)) } } func (itr *{{$k.name}}AuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) { return nil, nil, errors.New("not implemented") } func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) { return nil, errors.New("not implemented") } func (itr *{{$k.name}}AuxIterator) ExpandSources(sources Sources) (Sources, error) { return nil, errors.New("not implemented") } func (itr *{{.name}}AuxIterator) stream() { for { // Read next point. p := itr.input.Next() if p == nil { break } // Send point to output and to each field iterator. itr.output <- p if ok := itr.fields.send(p); !ok && itr.background { break } } close(itr.output) itr.fields.close() } // {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator. type {{$k.name}}ChanIterator struct { buf struct { i int filled bool points [2]{{$k.Name}}Point } cond *sync.Cond done bool } func (itr *{{$k.name}}ChanIterator) Stats() IteratorStats { return IteratorStats{} } func (itr *{{$k.name}}ChanIterator) Close() error { itr.cond.L.Lock() // Mark the channel iterator as done and signal all waiting goroutines to start again. itr.done = true itr.cond.Broadcast() // Do not defer the unlock so we don't create an unnecessary allocation. itr.cond.L.Unlock() return nil } func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { itr.cond.L.Lock() defer itr.cond.L.Unlock() // Wait for either the iterator to be done (so we don't have to set the value) // or for the buffer to have been read and ready for another write. for !itr.done && itr.buf.filled { itr.cond.Wait() } // Do not set the value and return false to signal that the iterator is closed. // Do this after the above wait as the above for loop may have exited because // the iterator was closed. if itr.done { return false } switch v := value.(type) { case {{$k.Type}}: itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v} {{if eq $k.Name "Float"}} case int64: itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)} {{end}} default: itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true} } itr.buf.filled = true // Signal to all waiting goroutines that a new value is ready to read. itr.cond.Signal() return true } func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { itr.cond.L.Lock() // Wait until either a value is available in the buffer or // the iterator is closed. for !itr.done && !itr.buf.filled { itr.cond.Wait() } // Return nil once the channel is done and the buffer is empty. if itr.done && !itr.buf.filled { itr.cond.L.Unlock() return nil } // Always read from the buffer if it exists, even if the iterator // is closed. This prevents the last value from being truncated by // the parent iterator. p := &itr.buf.points[itr.buf.i] itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) itr.buf.filled = false itr.cond.Signal() // Do not defer the unlock so we don't create an unnecessary allocation. itr.cond.L.Unlock() return p } {{range $v := $types}} // {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result. type {{$k.name}}Reduce{{$v.Name}}Iterator struct { input *buf{{$k.Name}}Iterator create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) opt IteratorOptions points []{{$v.Name}}Point } // Stats returns stats from the input iterator. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() } // Next returns the minimum value for the next available interval. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() *{{$v.Name}}Point { // Calculate next window if we have no more points. if len(itr.points) == 0 { itr.points = itr.reduce() if len(itr.points) == 0 { return nil } } // Pop next point off the stack. p := &itr.points[len(itr.points)-1] itr.points = itr.points[:len(itr.points)-1] return p } // {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination. type {{$k.name}}Reduce{{$v.Name}}Point struct { Name string Tags Tags Aggregator {{$k.Name}}PointAggregator Emitter {{$v.Name}}PointEmitter } // reduce executes fn once for every point in the next window. // The previous value for the dimension is passed to fn. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point { // Calculate next window. startTime, endTime := itr.opt.Window(itr.input.peekTime()) // Create points by tags. m := make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point) for { // Read next point. curr := itr.input.NextInWindow(startTime, endTime) if curr == nil { break } else if curr.Nil { continue } tags := curr.Tags.Subset(itr.opt.Dimensions) id := curr.Name if len(tags.m) > 0 { id += "\x00" + tags.ID() } // Retrieve the aggregator for this name/tag combination or create one. rp := m[id] if rp == nil { aggregator, emitter := itr.create() rp = &{{$k.name}}Reduce{{.Name}}Point{ Name: curr.Name, Tags: tags, Aggregator: aggregator, Emitter: emitter, } m[id] = rp } rp.Aggregator.Aggregate{{$k.Name}}(curr) } // Reverse sort points by name & tag. keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } if len(keys) > 1 { sort.Sort(reverseStringSlice(keys)) } a := make([]{{$v.Name}}Point, 0, len(m)) for _, k := range keys { rp := m[k] points := rp.Emitter.Emit() for i := len(points)-1; i >= 0; i-- { points[i].Name = rp.Name points[i].Tags = rp.Tags // Set the points time to the interval time if the reducer didn't provide one. if points[i].Time == ZeroTime { points[i].Time = startTime } a = append(a, points[i]) } } return a } // {{$k.name}}Stream{{$v.Name}}Iterator type {{$k.name}}Stream{{$v.Name}}Iterator struct { input *buf{{$k.Name}}Iterator create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) opt IteratorOptions m map[string]*{{$k.name}}Reduce{{$v.Name}}Point points []{{$v.Name}}Point } func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator { return &{{$k.name}}Stream{{$v.Name}}Iterator{ input: newBuf{{$k.Name}}Iterator(input), create: createFn, opt: opt, m: make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point), } } // Stats returns stats from the input iterator. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Close() error { return itr.input.Close() } // Next returns the next value for the stream iterator. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Next() *{{$v.Name}}Point { // Calculate next window if we have no more points. if len(itr.points) == 0 { itr.points = itr.reduce() if len(itr.points) == 0 { return nil } } // Pop next point off the stack. p := &itr.points[len(itr.points)-1] itr.points = itr.points[:len(itr.points)-1] return p } // reduce creates and manages aggregators for every point from the input. // After aggregating a point, it always tries to emit a value using the emitter. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point { for { // Read next point. curr := itr.input.Next() if curr == nil { return nil } else if curr.Nil { continue } tags := curr.Tags.Subset(itr.opt.Dimensions) id := curr.Name if len(tags.m) > 0 { id += "\x00" + tags.ID() } // Retrieve the aggregator for this name/tag combination or create one. rp := itr.m[id] if rp == nil { aggregator, emitter := itr.create() rp = &{{$k.name}}Reduce{{.Name}}Point{ Name: curr.Name, Tags: tags, Aggregator: aggregator, Emitter: emitter, } itr.m[id] = rp } rp.Aggregator.Aggregate{{$k.Name}}(curr) // Attempt to emit points from the aggregator. points := rp.Emitter.Emit() if len(points) == 0 { continue } for i := range points { points[i].Name = rp.Name points[i].Tags = rp.Tags } return points } } // {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator executes a function to modify an existing point // for every output of the input iterator. type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator struct { left *buf{{$k.Name}}Iterator right *buf{{$k.Name}}Iterator fn {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc } func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Stats() IteratorStats { stats := itr.left.Stats() stats.Add(itr.right.Stats()) return stats } func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Close() error { itr.left.Close() itr.right.Close() return nil } func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Next() *{{$v.Name}}Point { a := itr.left.Next() b := itr.right.Next() if a == nil && b == nil { return nil } return itr.fn(a, b) } // {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc creates or modifies a point by combining two // points. The point passed in may be modified and returned rather than // allocating a new point if possible. One of the points may be nil, but at // least one of the points will be non-nil. type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc func(a *{{$k.Name}}Point, b *{{$k.Name}}Point) *{{$v.Name}}Point {{end}} // {{$k.name}}TransformIterator executes a function to modify an existing point for every // output of the input iterator. type {{$k.name}}TransformIterator struct { input {{$k.Name}}Iterator fn {{$k.name}}TransformFunc } // Stats returns stats from the input iterator. func (itr *{{$k.name}}TransformIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}TransformIterator) Close() error { return itr.input.Close() } // Next returns the minimum value for the next available interval. func (itr *{{$k.name}}TransformIterator) Next() *{{$k.Name}}Point { p := itr.input.Next() if p != nil { p = itr.fn(p) } return p } // {{$k.name}}TransformFunc creates or modifies a point. // The point passed in may be modified and returned rather than allocating a // new point if possible. type {{$k.name}}TransformFunc func(p *{{$k.Name}}Point) *{{$k.Name}}Point // {{$k.name}}BoolTransformIterator executes a function to modify an existing point for every // output of the input iterator. type {{$k.name}}BoolTransformIterator struct { input {{$k.Name}}Iterator fn {{$k.name}}BoolTransformFunc } // Stats returns stats from the input iterator. func (itr *{{$k.name}}BoolTransformIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}BoolTransformIterator) Close() error { return itr.input.Close() } // Next returns the minimum value for the next available interval. func (itr *{{$k.name}}BoolTransformIterator) Next() *BooleanPoint { p := itr.input.Next() if p != nil { return itr.fn(p) } return nil } // {{$k.name}}BoolTransformFunc creates or modifies a point. // The point passed in may be modified and returned rather than allocating a // new point if possible. type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint // {{$k.name}}DedupeIterator only outputs unique points. // This differs from the DistinctIterator in that it compares all aux fields too. // This iterator is relatively inefficient and should only be used on small // datasets such as meta query results. type {{$k.name}}DedupeIterator struct { input {{$k.Name}}Iterator m map[string]struct{} // lookup of points already sent } // new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator. func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator { return &{{$k.name}}DedupeIterator{ input: input, m: make(map[string]struct{}), } } // Stats returns stats from the input iterator. func (itr *{{$k.name}}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() } // Next returns the next unique point from the input iterator. func (itr *{{$k.name}}DedupeIterator) Next() *{{$k.Name}}Point { for { // Read next point. p := itr.input.Next() if p == nil { return nil } // Serialize to bytes to store in lookup. buf, err := proto.Marshal(encode{{$k.Name}}Point(p)) if err != nil { log.Println("error marshaling dedupe point:", err) continue } // If the point has already been output then move to the next point. if _, ok := itr.m[string(buf)]; ok { continue } // Otherwise mark it as emitted and return point. itr.m[string(buf)] = struct{}{} return p } } // {{$k.name}}ReaderIterator represents an iterator that streams from a reader. type {{$k.name}}ReaderIterator struct { r io.Reader dec *{{$k.Name}}PointDecoder } // new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator. func new{{$k.Name}}ReaderIterator(r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator { dec := New{{$k.Name}}PointDecoder(r) dec.stats = stats return &{{$k.name}}ReaderIterator{ r: r, dec: dec, } } // Stats returns stats about points processed. func (itr *{{$k.name}}ReaderIterator) Stats() IteratorStats { return itr.dec.stats } // Close closes the underlying reader, if applicable. func (itr *{{$k.name}}ReaderIterator) Close() error { if r, ok := itr.r.(io.ReadCloser); ok { return r.Close() } return nil } // Next returns the next point from the iterator. func (itr *{{$k.name}}ReaderIterator) Next() *{{$k.Name}}Point { // OPTIMIZE(benbjohnson): Reuse point on iterator. // Unmarshal next point. p := &{{$k.Name}}Point{} if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF { return nil } else if err != nil { log.Printf("error reading iterator point: %s", err) return nil } return p } {{end}} // IteratorEncoder is an encoder for encoding an iterator's points to w. type IteratorEncoder struct { w io.Writer // Frequency with which stats are emitted. StatsInterval time.Duration } // NewIteratorEncoder encodes an iterator's points to w. func NewIteratorEncoder(w io.Writer) *IteratorEncoder { return &IteratorEncoder{ w: w, StatsInterval: DefaultStatsInterval, } } // Encode encodes and writes all of itr's points to the underlying writer. func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error { switch itr := itr.(type) { case FloatIterator: return enc.encodeFloatIterator(itr) case IntegerIterator: return enc.encodeIntegerIterator(itr) case StringIterator: return enc.encodeStringIterator(itr) case BooleanIterator: return enc.encodeBooleanIterator(itr) default: panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr)) } } {{range .}} // encode{{.Name}}Iterator encodes all points from itr to the underlying writer. func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error { ticker := time.NewTicker(enc.StatsInterval) defer ticker.Stop() // Emit initial stats. if err := enc.encodeStats(itr.Stats()); err != nil { return err } // Continually stream points from the iterator into the encoder. penc := New{{.Name}}PointEncoder(enc.w) for { // Emit stats periodically. select { case <-ticker.C: if err := enc.encodeStats(itr.Stats()); err != nil { return err } default: } // Retrieve the next point from the iterator. p := itr.Next() if p == nil { break } // Write the point to the point encoder. if err := penc.Encode{{.Name}}Point(p); err != nil { return err } } // Emit final stats. if err := enc.encodeStats(itr.Stats()); err != nil { return err } return nil } {{end}} // encode a stats object in the point stream. func (enc *IteratorEncoder) encodeStats(stats IteratorStats) error { buf, err := proto.Marshal(&internal.Point{ Name: proto.String(""), Tags: proto.String(""), Time: proto.Int64(0), Nil: proto.Bool(false), Stats: encodeIteratorStats(&stats), }) if err != nil { return err } if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil { return err } if _, err := enc.w.Write(buf); err != nil { return err } return nil } {{end}}