influxdb/influxql/iterator.gen.go.tmpl

1178 lines
32 KiB
Cheetah
Raw Normal View History

2015-11-04 21:06:06 +00:00
package influxql
import (
2015-12-24 04:42:10 +00:00
"container/heap"
"errors"
"encoding/binary"
2015-11-04 21:06:06 +00:00
"fmt"
"io"
2015-11-04 21:06:06 +00:00
"sort"
"sync"
2016-02-05 17:23:35 +00:00
"log"
"time"
2016-02-05 17:23:35 +00:00
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql/internal"
2015-11-04 21:06:06 +00:00
)
// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval.
const DefaultStatsInterval = 10 * time.Second
{{with $types := .}}{{range $k := $types}}
2015-11-04 21:06:06 +00:00
// {{$k.Name}}Iterator represents a stream of {{$k.name}} points.
type {{$k.Name}}Iterator interface {
2015-11-04 21:06:06 +00:00
Iterator
Next() *{{$k.Name}}Point
2015-11-04 21:06:06 +00:00
}
// new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator.
// Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot
// be cast to a {{$k.Name}}Iterator.
func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator {
a := make([]{{$k.Name}}Iterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case {{$k.Name}}Iterator:
a = append(a, itr)
{{if eq .Name "Float"}}
case IntegerIterator:
a = append(a, &integerFloatCastIterator{input: itr})
{{end}}
default:
itr.Close()
}
2015-11-04 21:06:06 +00:00
}
return a
}
// buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator.
type buf{{$k.Name}}Iterator struct {
itr {{$k.Name}}Iterator
buf *{{$k.Name}}Point
2015-11-04 21:06:06 +00:00
}
// newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator.
func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator {
return &buf{{$k.Name}}Iterator{itr: itr}
2015-11-04 21:06:06 +00:00
}
// Stats returns statistics from the input iterator.
func (itr *buf{{$k.Name}}Iterator) Stats() IteratorStats { return itr.itr.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the underlying iterator.
func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() }
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// peek returns the next point without removing it from the iterator.
func (itr *buf{{$k.Name}}Iterator) peek() *{{$k.Name}}Point {
2015-11-04 21:06:06 +00:00
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
2015-11-04 21:06:06 +00:00
// Returns zero time if no more points available.
func (itr *buf{{$k.Name}}Iterator) peekTime() int64 {
2015-11-04 21:06:06 +00:00
p := itr.peek()
if p == nil {
2015-11-04 21:06:06 +00:00
return ZeroTime
}
2015-11-04 21:06:06 +00:00
return p.Time
2015-11-04 21:06:06 +00:00
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *buf{{$k.Name}}Iterator) Next() *{{$k.Name}}Point {
2015-11-04 21:06:06 +00:00
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) *{{$k.Name}}Point {
2015-11-04 21:06:06 +00:00
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v }
2015-11-04 21:06:06 +00:00
// {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators.
type {{$k.name}}MergeIterator struct {
inputs []{{$k.Name}}Iterator
heap *{{$k.name}}MergeHeap
2015-11-04 21:06:06 +00:00
// Current iterator and window.
curr *{{$k.name}}MergeHeapItem
2015-11-04 21:06:06 +00:00
window struct {
name string
tags string
2015-11-04 21:06:06 +00:00
startTime int64
endTime int64
2015-11-04 21:06:06 +00:00
}
}
// new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator.
func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator {
itr := &{{$k.name}}MergeIterator{
2015-11-04 21:06:06 +00:00
inputs: inputs,
heap: &{{$k.name}}MergeHeap{
items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)),
opt: opt,
2015-11-04 21:06:06 +00:00
},
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBuf{{$k.Name}}Iterator(input)
2015-11-04 21:06:06 +00:00
if bufInput.peek() == nil {
2015-11-04 21:06:06 +00:00
continue
}
2015-11-04 21:06:06 +00:00
// Append to the heap.
itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput})
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
heap.Init(itr.heap)
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
return itr
}
2015-11-04 21:06:06 +00:00
// Stats returns an aggregation of stats from the underlying iterators.
func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats {
var stats IteratorStats
for _, input := range itr.inputs {
stats.Add(input.Stats())
}
return stats
}
2015-11-04 21:06:06 +00:00
// Close closes the underlying iterators.
func (itr *{{$k.name}}MergeIterator) Close() error {
2015-11-04 21:06:06 +00:00
for _, input := range itr.inputs {
input.Close()
2015-11-04 21:06:06 +00:00
}
return nil
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Next returns the next point from the iterator.
func (itr *{{$k.name}}MergeIterator) Next() *{{$k.Name}}Point {
2015-11-04 21:06:06 +00:00
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem)
2015-11-04 21:06:06 +00:00
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
2015-11-04 21:06:06 +00:00
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
2016-01-22 23:46:56 +00:00
2015-11-04 21:06:06 +00:00
// If there are no more points then remove iterator from heap and find next.
2015-11-04 21:06:06 +00:00
if p == nil {
2015-11-04 21:06:06 +00:00
itr.curr = nil
2015-11-04 21:06:06 +00:00
continue
}
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
inWindow = false
}
2015-11-04 21:06:06 +00:00
// If it's outside our window then push iterator back on the heap and find new iterator.
if !inWindow {
2015-11-04 21:06:06 +00:00
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return p
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems.
2015-11-04 21:06:06 +00:00
// Items are sorted by their next window and then by name/tags.
type {{$k.name}}MergeHeap struct {
2015-11-04 21:06:06 +00:00
opt IteratorOptions
items []*{{$k.name}}MergeHeapItem
2015-11-04 21:06:06 +00:00
}
func (h {{$k.name}}MergeHeap) Len() int { return len(h.items) }
func (h {{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h {{$k.name}}MergeHeap) Less(i, j int) bool {
2015-11-04 21:06:06 +00:00
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
2015-11-04 21:06:06 +00:00
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
2016-01-22 23:46:56 +00:00
return xt < yt
2015-11-04 21:06:06 +00:00
}
2016-01-22 23:46:56 +00:00
return xt > yt
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
func (h *{{$k.name}}MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem))
2015-11-04 21:06:06 +00:00
}
func (h *{{$k.name}}MergeHeap) Pop() interface{} {
2015-11-04 21:06:06 +00:00
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
2015-11-04 21:06:06 +00:00
}
type {{$k.name}}MergeHeapItem struct {
itr *buf{{$k.Name}}Iterator
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{$k.name}}SortedMergeIterator struct {
inputs []{{$k.Name}}Iterator
2015-11-04 21:06:06 +00:00
opt IteratorOptions
heap {{$k.name}}SortedMergeHeap
2015-11-04 21:06:06 +00:00
}
// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator {
itr := &{{$k.name}}SortedMergeIterator{
2015-11-04 21:06:06 +00:00
inputs: inputs,
heap: make({{$k.name}}SortedMergeHeap, 0, len(inputs)),
2015-12-24 04:42:10 +00:00
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
itr.heap = append(itr.heap, &{{$k.name}}SortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
2015-12-24 04:42:10 +00:00
}
heap.Init(&itr.heap)
return itr
}
// Stats returns an aggregation of stats from the underlying iterators.
func (itr *{{$k.name}}SortedMergeIterator) Stats() IteratorStats {
var stats IteratorStats
for _, input := range itr.inputs {
stats.Add(input.Stats())
}
return stats
}
2015-12-24 04:42:10 +00:00
// Close closes the underlying iterators.
func (itr *{{$k.name}}SortedMergeIterator) Close() error {
2015-11-04 21:06:06 +00:00
for _, input := range itr.inputs {
input.Close()
}
return nil
}
2015-12-24 04:42:10 +00:00
// Next returns the next points from the iterator.
func (itr *{{$k.name}}SortedMergeIterator) Next() *{{$k.Name}}Point { return itr.pop() }
2015-12-24 04:42:10 +00:00
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *{{$k.name}}SortedMergeIterator) pop() *{{$k.Name}}Point {
2015-12-24 04:42:10 +00:00
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*{{$k.name}}SortedMergeHeapItem)
2015-12-24 04:42:10 +00:00
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems.
type {{$k.name}}SortedMergeHeap []*{{$k.name}}SortedMergeHeapItem
2015-12-24 04:42:10 +00:00
func (h {{$k.name}}SortedMergeHeap) Len() int { return len(h) }
func (h {{$k.name}}SortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h {{$k.name}}SortedMergeHeap) Less(i, j int) bool {
2015-12-24 04:42:10 +00:00
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*{{$k.name}}SortedMergeHeapItem))
2015-12-24 04:42:10 +00:00
}
func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} {
2015-12-24 04:42:10 +00:00
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
type {{$k.name}}SortedMergeHeapItem struct {
point *{{$k.Name}}Point
itr {{$k.Name}}Iterator
2015-12-24 04:42:10 +00:00
ascending bool
}
// {{$k.name}}LimitIterator represents an iterator that limits points per group.
type {{$k.name}}LimitIterator struct {
input {{$k.Name}}Iterator
2015-12-24 18:46:31 +00:00
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator.
func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator {
return &{{$k.name}}LimitIterator{
2015-12-24 18:46:31 +00:00
input: input,
opt: opt,
}
}
// Stats returns stats from the underlying iterator.
func (itr *{{$k.name}}LimitIterator) Stats() IteratorStats { return itr.input.Stats() }
2015-12-24 18:46:31 +00:00
// Close closes the underlying iterators.
func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() }
2015-12-24 18:46:31 +00:00
// Next returns the next point from the iterator.
func (itr *{{$k.name}}LimitIterator) Next() *{{$k.Name}}Point {
2015-12-24 18:46:31 +00:00
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
// If there's no interval, no groups, and a single source then simply exit.
if itr.opt.Interval.IsZero() && len(itr.opt.Dimensions) == 0 && len(itr.opt.Sources) == 1 {
return nil
}
2015-12-24 18:46:31 +00:00
continue
}
return p
}
}
type {{$k.name}}FillIterator struct {
input *buf{{$k.Name}}Iterator
prev *{{$k.Name}}Point
startTime int64
endTime int64
auxFields []interface{}
done bool
opt IteratorOptions
window struct {
name string
tags Tags
time int64
}
}
func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt IteratorOptions) *{{$k.name}}FillIterator {
if opt.Fill == NullFill {
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
opt.Fill = NumberFill
opt.FillValue = {{$k.Zero}}
}
}
var startTime, endTime int64
if opt.Ascending {
startTime, _ = opt.Window(opt.StartTime)
_, endTime = opt.Window(opt.EndTime)
} else {
_, startTime = opt.Window(opt.EndTime)
endTime, _ = opt.Window(opt.StartTime)
}
var auxFields []interface{}
if len(opt.Aux) > 0 {
auxFields = make([]interface{}, len(opt.Aux))
}
itr := &{{$k.name}}FillIterator{
input: newBuf{{$k.Name}}Iterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
}
p := itr.input.peek()
if p != nil {
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
} else {
itr.window.time = itr.endTime
}
return itr
}
func (itr *{{$k.name}}FillIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}FillIterator) Next() *{{$k.Name}}Point {
p := itr.input.Next()
// Check if the next point is outside of our window or is nil.
for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
// If we are inside of an interval, unread the point and continue below to
// constructing a new point.
if itr.opt.Ascending {
if itr.window.time < itr.endTime {
itr.input.unread(p)
p = nil
break
}
} else {
if itr.window.time >= itr.endTime {
itr.input.unread(p)
p = nil
break
}
}
// We are *not* in a current interval. If there is no next point,
// we are at the end of all intervals.
if p == nil {
return nil
}
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
itr.prev = nil
break
}
// Check if the point is our next expected point.
if p == nil || p.Time > itr.window.time {
if p != nil {
itr.input.unread(p)
}
p = &{{$k.Name}}Point{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
Aux: itr.auxFields,
}
switch itr.opt.Fill {
case NullFill:
p.Nil = true
case NumberFill:
p.Value = castTo{{$k.Name}}(itr.opt.FillValue)
case PreviousFill:
if itr.prev != nil {
p.Value = itr.prev.Value
p.Nil = itr.prev.Nil
} else {
p.Nil = true
}
}
} else {
itr.prev = p
}
// Advance the expected time. Do not advance to a new window here
// as there may be lingering points with the same timestamp in the previous
// window.
if itr.opt.Ascending {
itr.window.time = p.Time + int64(itr.opt.Interval.Duration)
} else {
itr.window.time = p.Time - int64(itr.opt.Interval.Duration)
}
return p
}
// {{$k.name}}IntervalIterator represents a {{$k.name}} implementation of IntervalIterator.
type {{$k.name}}IntervalIterator struct {
input {{$k.Name}}Iterator
opt IteratorOptions
}
func new{{$k.Name}}IntervalIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}IntervalIterator {
return &{{$k.name}}IntervalIterator{input: input, opt: opt}
}
func (itr *{{$k.name}}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}IntervalIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point {
p := itr.input.Next()
if p == nil {
return p
}
p.Time, _ = itr.opt.Window(p.Time)
return p
}
// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator.
type {{$k.name}}InterruptIterator struct {
input {{$k.Name}}Iterator
closing <-chan struct{}
count int
}
func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator {
return &{{$k.name}}InterruptIterator{input: input, closing: closing}
}
2016-03-21 22:38:18 +00:00
func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}InterruptIterator) Next() *{{$k.Name}}Point {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator
output chan *{{$k.Name}}Point
fields auxIteratorFields
background bool
2015-11-04 21:06:06 +00:00
}
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator {
return &{{$k.name}}AuxIterator{
input: newBuf{{$k.Name}}Iterator(input),
output: make(chan *{{$k.Name}}Point, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
2015-11-04 21:06:06 +00:00
}
}
func (itr *{{$k.name}}AuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{$k.name}}AuxIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output }
func (itr *{{$k.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
2015-11-04 21:06:06 +00:00
func (itr *{{$k.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
2015-11-04 21:06:06 +00:00
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.Iterator(expr.Val), nil
2015-11-04 21:06:06 +00:00
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *{{$k.name}}AuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
return nil, nil, errors.New("not implemented")
}
func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return nil, errors.New("not implemented")
2015-11-04 21:06:06 +00:00
}
func (itr *{{.name}}AuxIterator) ExpandSources(sources Sources) (Sources, error) {
return nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) stream() {
2015-11-04 21:06:06 +00:00
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
if ok := itr.fields.send(p); !ok && itr.background {
break
}
2015-11-04 21:06:06 +00:00
}
close(itr.output)
itr.fields.close()
}
// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator.
type {{$k.name}}ChanIterator struct {
buf *{{$k.Name}}Point
cond *sync.Cond
done bool
2015-11-04 21:06:06 +00:00
}
func (itr *{{$k.name}}ChanIterator) Stats() IteratorStats { return IteratorStats{} }
func (itr *{{$k.name}}ChanIterator) Close() error {
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
2015-11-04 21:06:06 +00:00
return nil
}
func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case {{$k.Type}}:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v}
{{if eq $k.Name "Float"}}
case int64:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)}
{{end}}
default:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true}
}
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point {
itr.cond.L.Lock()
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
itr.cond.Wait()
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return p
}
{{range $v := $types}}
2015-11-04 21:06:06 +00:00
// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result.
type {{$k.name}}Reduce{{$v.Name}}Iterator struct {
input *buf{{$k.Name}}Iterator
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
2015-11-04 21:06:06 +00:00
opt IteratorOptions
points []{{$v.Name}}Point
2015-11-04 21:06:06 +00:00
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
2015-11-04 21:06:06 +00:00
// Next returns the minimum value for the next available interval.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() *{{.Name}}Point {
2015-11-04 21:06:06 +00:00
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := &itr.points[len(itr.points)-1]
2015-11-04 21:06:06 +00:00
itr.points = itr.points[:len(itr.points)-1]
return p
}
// {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination.
type {{$k.name}}Reduce{{$v.Name}}Point struct {
Name string
Tags Tags
Aggregator {{$k.Name}}PointAggregator
Emitter {{$v.Name}}PointEmitter
}
2015-11-04 21:06:06 +00:00
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point {
2015-11-04 21:06:06 +00:00
// Calculate next window.
2015-11-04 21:06:06 +00:00
startTime, endTime := itr.opt.Window(itr.input.peekTime())
2015-11-04 21:06:06 +00:00
// Create points by tags.
m := make(map[string]*{{$k.name}}Reduce{{.Name}}Point)
2015-11-04 21:06:06 +00:00
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
2016-01-25 16:28:03 +00:00
} else if curr.Nil {
continue
2015-11-04 21:06:06 +00:00
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
2015-11-04 21:06:06 +00:00
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &{{$k.name}}Reduce{{.Name}}Point{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
m[id] = rp
2015-11-04 21:06:06 +00:00
}
rp.Aggregator.Aggregate{{$k.Name}}(curr)
2015-11-04 21:06:06 +00:00
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]{{$v.Name}}Point, 0, len(m))
for _, k := range keys {
rp := m[k]
points := rp.Emitter.Emit()
for i := len(points)-1; i >= 0; i-- {
points[i].Name = rp.Name
points[i].Tags = rp.Tags
// Set the points time to the interval time if the reducer didn't provide one.
if points[i].Time == ZeroTime {
points[i].Time = startTime
}
a = append(a, points[i])
}
}
2015-11-04 21:06:06 +00:00
return a
}
// {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator executes a function to modify an existing point
// for every output of the input iterator.
type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator struct {
left *buf{{$k.Name}}Iterator
right *buf{{$k.Name}}Iterator
fn {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc
}
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Stats() IteratorStats {
stats := itr.left.Stats()
stats.Add(itr.right.Stats())
return stats
}
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Close() error {
itr.left.Close()
itr.right.Close()
return nil
}
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Next() *{{$v.Name}}Point {
a := itr.left.Next()
b := itr.right.Next()
if a == nil && b == nil {
return nil
}
return itr.fn(a, b)
}
// {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc creates or modifies a point by combining two
// points. The point passed in may be modified and returned rather than
// allocating a new point if possible. One of the points may be nil, but at
// least one of the points will be non-nil.
type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc func(a *{{$k.Name}}Point, b *{{$k.Name}}Point) *{{$v.Name}}Point
{{end}}
2015-11-04 21:06:06 +00:00
// {{$k.name}}TransformIterator executes a function to modify an existing point for every
2015-11-04 21:06:06 +00:00
// output of the input iterator.
type {{$k.name}}TransformIterator struct {
input {{$k.Name}}Iterator
fn {{$k.name}}TransformFunc
2015-11-04 21:06:06 +00:00
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}TransformIterator) Stats() IteratorStats { return itr.input.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}TransformIterator) Close() error { return itr.input.Close() }
2015-11-04 21:06:06 +00:00
// Next returns the minimum value for the next available interval.
func (itr *{{$k.name}}TransformIterator) Next() *{{$k.Name}}Point {
2015-11-04 21:06:06 +00:00
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// {{$k.name}}TransformFunc creates or modifies a point.
2015-11-04 21:06:06 +00:00
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type {{$k.name}}TransformFunc func(p *{{$k.Name}}Point) *{{$k.Name}}Point
2015-11-04 21:06:06 +00:00
// {{$k.name}}ReduceIterator executes a function to modify an existing point for every
2015-11-04 21:06:06 +00:00
// output of the input iterator.
type {{$k.name}}BoolTransformIterator struct {
input {{$k.Name}}Iterator
fn {{$k.name}}BoolTransformFunc
2015-11-04 21:06:06 +00:00
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}BoolTransformIterator) Stats() IteratorStats { return itr.input.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}BoolTransformIterator) Close() error { return itr.input.Close() }
2015-11-04 21:06:06 +00:00
// Next returns the minimum value for the next available interval.
func (itr *{{$k.name}}BoolTransformIterator) Next() *BooleanPoint {
2015-11-04 21:06:06 +00:00
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// {{$k.name}}BoolTransformFunc creates or modifies a point.
2015-11-04 21:06:06 +00:00
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint
2015-11-04 21:06:06 +00:00
// {{$k.name}}DedupeIterator only outputs unique points.
2016-02-05 17:23:35 +00:00
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type {{$k.name}}DedupeIterator struct {
input {{$k.Name}}Iterator
m map[string]struct{} // lookup of points already sent
2016-02-05 17:23:35 +00:00
}
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
return &{{$k.name}}DedupeIterator{
2016-02-05 17:23:35 +00:00
input: input,
m: make(map[string]struct{}),
2016-02-05 17:23:35 +00:00
}
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() }
2016-02-05 17:23:35 +00:00
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() }
2016-02-05 17:23:35 +00:00
// Next returns the next unique point from the input iterator.
func (itr *{{$k.name}}DedupeIterator) Next() *{{$k.Name}}Point {
2016-02-05 17:23:35 +00:00
for {
// Read next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encode{{$k.Name}}Point(p))
2016-02-05 17:23:35 +00:00
if err != nil {
log.Println("error marshaling dedupe point:", err)
continue
}
// If the point has already been output then move to the next point.
if _, ok := itr.m[string(buf)]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[string(buf)] = struct{}{}
return p
}
}
2015-11-04 21:06:06 +00:00
// {{$k.name}}ReaderIterator represents an iterator that streams from a reader.
type {{$k.name}}ReaderIterator struct {
r io.Reader
dec *{{$k.Name}}PointDecoder
first *{{$k.Name}}Point
}
// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator.
func new{{$k.Name}}ReaderIterator(r io.Reader, first *{{$k.Name}}Point, stats IteratorStats) *{{$k.name}}ReaderIterator {
dec := New{{$k.Name}}PointDecoder(r)
dec.stats = stats
return &{{$k.name}}ReaderIterator{
r: r,
dec: dec,
first: first,
}
}
// Stats returns stats about points processed.
func (itr *{{$k.name}}ReaderIterator) Stats() IteratorStats { return itr.dec.stats }
// Close closes the underlying reader, if applicable.
func (itr *{{$k.name}}ReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *{{$k.name}}ReaderIterator) Next() *{{$k.Name}}Point {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &{{$k.Name}}Point{}
if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
{{end}}
// IteratorEncoder is an encoder for encoding an iterator's points to w.
type IteratorEncoder struct {
w io.Writer
// Frequency with which stats are emitted.
StatsInterval time.Duration
}
// NewIteratorEncoder encodes an iterator's points to w.
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
return &IteratorEncoder{
w: w,
StatsInterval: DefaultStatsInterval,
}
}
// Encode encodes and writes all of itr's points to the underlying writer.
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
switch itr := itr.(type) {
case FloatIterator:
return enc.encodeFloatIterator(itr)
case IntegerIterator:
return enc.encodeIntegerIterator(itr)
case StringIterator:
return enc.encodeStringIterator(itr)
case BooleanIterator:
return enc.encodeBooleanIterator(itr)
default:
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
}
}
{{range .}}
// encode{{.Name}}Iterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error {
ticker := time.NewTicker(enc.StatsInterval)
defer ticker.Stop()
// Emit initial stats.
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
// Continually stream points from the iterator into the encoder.
penc := New{{.Name}}PointEncoder(enc.w)
for {
// Emit stats periodically.
select {
case <-ticker.C:
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
default:
}
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
break
}
// Write the point to the point encoder.
if err := penc.Encode{{.Name}}Point(p); err != nil {
return err
}
}
// Emit final stats.
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
return nil
}
{{end}}
// encode a stats object in the point stream.
func (enc *IteratorEncoder) encodeStats(stats IteratorStats) error {
buf, err := proto.Marshal(&internal.Point{
Name: proto.String(""),
Tags: proto.String(""),
Time: proto.Int64(0),
Nil: proto.Bool(false),
Stats: encodeIteratorStats(&stats),
})
if err != nil {
return err
}
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
{{end}}