2017-08-15 19:24:22 +00:00
package query
2015-11-04 21:06:06 +00:00
import (
2017-10-11 14:08:31 +00:00
"context"
2015-12-24 04:42:10 +00:00
"container/heap"
2016-02-19 20:38:02 +00:00
"io"
2015-11-04 21:06:06 +00:00
"sort"
"sync"
2016-03-17 15:55:37 +00:00
"time"
2017-10-25 20:08:55 +00:00
"sync"
2016-02-05 17:23:35 +00:00
"github.com/gogo/protobuf/proto"
2017-11-03 15:57:17 +00:00
"github.com/influxdata/influxql"
2015-11-04 21:06:06 +00:00
)
2016-03-17 15:55:37 +00:00
// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval.
2017-06-12 18:10:40 +00:00
const DefaultStatsInterval = time.Second
2016-03-17 15:55:37 +00:00
2016-03-02 23:42:00 +00:00
{{with $ types := .}}{{range $k := $ types }}
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . Name }}Iterator represents a stream of {{ $ k . name }} points.
type {{ $ k . Name }}Iterator interface {
2015-11-04 21:06:06 +00:00
Iterator
2016-04-17 20:00:59 +00:00
Next() (*{{ $ k . Name }}Point, error)
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
// new{{ $ k . Name }}Iterators converts a slice of Iterator to a slice of {{ $ k . Name }}Iterator.
// Drop and closes any iterator in itrs that is not a {{ $ k . Name }}Iterator and cannot
// be cast to a {{ $ k . Name }}Iterator.
func new{{ $ k . Name }}Iterators(itrs []Iterator) []{{ $ k . Name }}Iterator {
a := make([]{{ $ k . Name }}Iterator, 0, len(itrs))
2016-02-03 03:12:32 +00:00
for _, itr := range itrs {
switch itr := itr.(type) {
2016-03-02 23:42:00 +00:00
case {{ $ k . Name }}Iterator:
2016-02-03 03:12:32 +00:00
a = append(a, itr)
default:
itr.Close()
}
2015-11-04 21:06:06 +00:00
}
return a
}
2016-03-02 23:42:00 +00:00
// buf{{ $ k . Name }}Iterator represents a buffered {{ $ k . Name }}Iterator.
type buf{{ $ k . Name }}Iterator struct {
itr {{ $ k . Name }}Iterator
buf *{{ $ k . Name }}Point
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
// newBuf{{ $ k . Name }}Iterator returns a buffered {{ $ k . Name }}Iterator.
func newBuf{{ $ k . Name }}Iterator(itr {{ $ k . Name }}Iterator) *buf{{ $ k . Name }}Iterator {
return &buf{{ $ k . Name }}Iterator{itr: itr}
2015-11-04 21:06:06 +00:00
}
2016-03-17 15:55:37 +00:00
// Stats returns statistics from the input iterator.
func (itr *buf{{ $ k . Name }}Iterator) Stats() IteratorStats { return itr.itr.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the underlying iterator.
2016-03-02 23:42:00 +00:00
func (itr *buf{{ $ k . Name }}Iterator) Close() error { return itr.itr.Close() }
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// peek returns the next point without removing it from the iterator.
2016-04-17 20:00:59 +00:00
func (itr *buf{{ $ k . Name }}Iterator) peek() (*{{ $ k . Name }}Point, error) {
p, err := itr.Next()
if err != nil {
return nil, err
}
2015-11-04 21:06:06 +00:00
itr.unread(p)
2016-04-17 20:00:59 +00:00
return p, nil
2015-11-04 21:06:06 +00:00
}
// peekTime returns the time of the next point.
2015-11-04 21:06:06 +00:00
// Returns zero time if no more points available.
2016-04-17 20:00:59 +00:00
func (itr *buf{{ $ k . Name }}Iterator) peekTime() (int64, error) {
p, err := itr.peek()
if p == nil || err != nil {
return ZeroTime, err
2015-11-04 21:06:06 +00:00
}
2016-04-17 20:00:59 +00:00
return p.Time, nil
2015-11-04 21:06:06 +00:00
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
2016-04-17 20:00:59 +00:00
func (itr *buf{{ $ k . Name }}Iterator) Next() (*{{ $ k . Name }}Point, error) {
2016-04-25 15:12:55 +00:00
buf := itr.buf
if buf != nil {
2015-11-04 21:06:06 +00:00
itr.buf = nil
2016-04-17 20:00:59 +00:00
return buf, nil
2015-11-04 21:06:06 +00:00
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
2016-04-17 20:00:59 +00:00
func (itr *buf{{ $ k . Name }}Iterator) NextInWindow(startTime, endTime int64) (*{{ $ k . Name }}Point, error) {
v, err := itr.Next()
if v == nil || err != nil {
return nil, err
2016-04-25 15:12:55 +00:00
} else if t := v.Time; t >= endTime || t < startTime {
2015-11-04 21:06:06 +00:00
itr.unread(v)
2016-04-17 20:00:59 +00:00
return nil, nil
2015-11-04 21:06:06 +00:00
}
2016-04-17 20:00:59 +00:00
return v, nil
2015-11-04 21:06:06 +00:00
}
// unread sets v to the buffer. It is read on the next call to Next().
2016-03-02 23:42:00 +00:00
func (itr *buf{{ $ k . Name }}Iterator) unread(v *{{ $ k . Name }}Point) { itr.buf = v }
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}MergeIterator represents an iterator that combines multiple {{ $ k . name }} iterators.
type {{ $ k . name }}MergeIterator struct {
inputs []{{ $ k . Name }}Iterator
heap *{{ $ k . name }}MergeHeap
2016-03-28 14:38:58 +00:00
init bool
2015-11-04 21:06:06 +00:00
2017-11-21 21:18:35 +00:00
closed bool
mu sync.RWMutex
2016-02-04 21:13:43 +00:00
// Current iterator and window.
2016-03-02 23:42:00 +00:00
curr *{{ $ k . name }}MergeHeapItem
2015-11-04 21:06:06 +00:00
window struct {
2016-02-03 22:09:56 +00:00
name string
2016-02-04 21:13:43 +00:00
tags string
2015-11-04 21:06:06 +00:00
startTime int64
2016-02-03 22:09:56 +00:00
endTime int64
2015-11-04 21:06:06 +00:00
}
}
2016-03-02 23:42:00 +00:00
// new{{ $ k . Name }}MergeIterator returns a new instance of {{ $ k . name }}MergeIterator.
func new{{ $ k . Name }}MergeIterator(inputs []{{ $ k . Name }}Iterator, opt IteratorOptions) *{{ $ k . name }}MergeIterator {
itr := &{{ $ k . name }}MergeIterator{
2015-11-04 21:06:06 +00:00
inputs: inputs,
2016-03-02 23:42:00 +00:00
heap: &{{ $ k . name }}MergeHeap{
items: make([]*{{ $ k . name }}MergeHeapItem, 0, len(inputs)),
2016-11-23 20:32:42 +00:00
opt: opt,
2015-11-04 21:06:06 +00:00
},
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
2016-03-02 23:42:00 +00:00
bufInput := newBuf{{ $ k . Name }}Iterator(input)
2015-11-04 21:06:06 +00:00
2015-11-04 21:06:06 +00:00
// Append to the heap.
2016-03-02 23:42:00 +00:00
itr.heap.items = append(itr.heap.items, &{{ $ k . name }}MergeHeapItem{itr: bufInput})
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
return itr
}
2015-11-04 21:06:06 +00:00
2016-03-17 15:55:37 +00:00
// Stats returns an aggregation of stats from the underlying iterators.
func (itr *{{ $ k . name }}MergeIterator) Stats() IteratorStats {
var stats IteratorStats
for _, input := range itr.inputs {
stats.Add(input.Stats())
}
return stats
}
2015-11-04 21:06:06 +00:00
// Close closes the underlying iterators.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}MergeIterator) Close() error {
2017-11-21 21:18:35 +00:00
itr.mu.Lock()
defer itr.mu.Unlock()
2015-11-04 21:06:06 +00:00
for _, input := range itr.inputs {
2016-01-22 20:38:59 +00:00
input.Close()
2015-11-04 21:06:06 +00:00
}
2016-08-03 04:51:16 +00:00
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
2017-11-21 21:18:35 +00:00
itr.closed = true
2015-11-04 21:06:06 +00:00
return nil
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
// Next returns the next point from the iterator.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}MergeIterator) Next() (*{{ $ k . Name }}Point, error) {
2017-11-21 21:18:35 +00:00
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
2016-03-28 14:38:58 +00:00
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations
// done during iterator creation cannot be interrupted, which is why we do it here
// instead so an interrupt can happen while initializing the heap.
if !itr.init {
items := itr.heap.items
itr.heap.items = make([]*{{ $ k . name }}MergeHeapItem, 0, len(items))
for _, item := range items {
2016-04-17 20:00:59 +00:00
if p, err := item.itr.peek(); err != nil {
return nil, err
} else if p == nil {
2016-03-28 14:38:58 +00:00
continue
}
itr.heap.items = append(itr.heap.items, item)
}
heap.Init(itr.heap)
2016-03-31 15:56:23 +00:00
itr.init = true
2016-03-28 14:38:58 +00:00
}
2015-11-04 21:06:06 +00:00
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
2016-04-17 20:00:59 +00:00
return nil, nil
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
itr.curr = heap.Pop(itr.heap).(*{{ $ k . name }}MergeHeapItem)
2015-11-04 21:06:06 +00:00
// Read point and set current window.
2016-04-17 20:00:59 +00:00
p, err := itr.curr.itr.Next()
if err != nil {
return nil, err
}
2016-11-23 20:32:42 +00:00
tags := p.Tags.Subset(itr.heap.opt.Dimensions)
itr.window.name, itr.window.tags = p.Name, tags.ID()
2015-11-04 21:06:06 +00:00
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
2016-04-17 20:00:59 +00:00
return p, nil
2015-11-04 21:06:06 +00:00
}
// Read the next point from the current iterator.
2016-04-17 20:00:59 +00:00
p, err := itr.curr.itr.Next()
if err != nil {
return nil, err
}
2016-01-22 23:46:56 +00:00
2015-11-04 21:06:06 +00:00
// If there are no more points then remove iterator from heap and find next.
2015-11-04 21:06:06 +00:00
if p == nil {
2015-11-04 21:06:06 +00:00
itr.curr = nil
2015-11-04 21:06:06 +00:00
continue
}
2016-02-03 22:09:56 +00:00
// Check if the point is inside of our current window.
inWindow := true
2016-04-25 15:12:55 +00:00
if window := itr.window; window.name != p.Name {
2016-02-03 22:09:56 +00:00
inWindow = false
2016-11-23 20:32:42 +00:00
} else if tags := p.Tags.Subset(itr.heap.opt.Dimensions); window.tags != tags.ID() {
2016-02-04 21:13:43 +00:00
inWindow = false
2016-04-25 15:12:55 +00:00
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
2016-02-03 22:09:56 +00:00
inWindow = false
2016-04-25 15:12:55 +00:00
} else if !opt.Ascending && p.Time < window.startTime {
2016-02-03 22:09:56 +00:00
inWindow = false
}
2015-11-04 21:06:06 +00:00
// If it's outside our window then push iterator back on the heap and find new iterator.
2016-02-03 22:09:56 +00:00
if !inWindow {
2015-11-04 21:06:06 +00:00
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
2015-11-04 21:06:06 +00:00
}
2016-04-17 20:00:59 +00:00
return p, nil
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}MergeHeap represents a heap of {{ $ k . name }}MergeHeapItems.
2015-11-04 21:06:06 +00:00
// Items are sorted by their next window and then by name/tags.
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}MergeHeap struct {
2015-11-04 21:06:06 +00:00
opt IteratorOptions
2016-03-02 23:42:00 +00:00
items []*{{ $ k . name }}MergeHeapItem
2015-11-04 21:06:06 +00:00
}
2016-11-23 20:32:42 +00:00
func (h *{{ $ k . name }}MergeHeap) Len() int { return len(h.items) }
func (h *{{ $ k . name }}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *{{ $ k . name }}MergeHeap) Less(i, j int) bool {
2016-04-17 20:00:59 +00:00
x, err := h.items[i].itr.peek()
if err != nil {
return true
}
y, err := h.items[j].itr.peek()
if err != nil {
return false
}
2016-02-03 22:09:56 +00:00
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
2016-11-23 20:32:42 +00:00
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
return xTags.ID() < yTags.ID()
2016-02-03 22:09:56 +00:00
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
2016-11-23 20:32:42 +00:00
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
return xTags.ID() > yTags.ID()
2016-02-03 22:09:56 +00:00
}
}
2015-11-04 21:06:06 +00:00
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
2016-01-22 23:46:56 +00:00
return xt < yt
2015-11-04 21:06:06 +00:00
}
2016-01-22 23:46:56 +00:00
return xt > yt
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
func (h *{{ $ k . name }}MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{ $ k . name }}MergeHeapItem))
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
func (h *{{ $ k . name }}MergeHeap) Pop() interface{} {
2015-11-04 21:06:06 +00:00
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}MergeHeapItem struct {
itr *buf{{ $ k . Name }}Iterator
2015-11-04 21:06:06 +00:00
}
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{ $ k . name }}SortedMergeIterator struct {
inputs []{{ $ k . Name }}Iterator
2016-11-23 20:32:42 +00:00
heap *{{ $ k . name }}SortedMergeHeap
2016-04-17 20:00:59 +00:00
init bool
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
// new{{ $ k . Name }}SortedMergeIterator returns an instance of {{ $ k . name }}SortedMergeIterator.
func new{{ $ k . Name }}SortedMergeIterator(inputs []{{ $ k . Name }}Iterator, opt IteratorOptions) Iterator {
itr := &{{ $ k . name }}SortedMergeIterator{
2015-11-04 21:06:06 +00:00
inputs: inputs,
2016-11-23 20:32:42 +00:00
heap: &{{ $ k . name }}SortedMergeHeap{
items: make([]*{{ $ k . name }}SortedMergeHeapItem, 0, len(inputs)),
opt: opt,
},
2015-12-24 04:42:10 +00:00
}
2016-04-17 20:00:59 +00:00
// Initialize heap items.
2015-12-24 04:42:10 +00:00
for _, input := range inputs {
// Append to the heap.
2016-11-23 20:32:42 +00:00
itr.heap.items = append(itr.heap.items, &{{ $ k . name }}SortedMergeHeapItem{itr: input})
2015-12-24 04:42:10 +00:00
}
return itr
}
2016-03-17 15:55:37 +00:00
// Stats returns an aggregation of stats from the underlying iterators.
func (itr *{{ $ k . name }}SortedMergeIterator) Stats() IteratorStats {
var stats IteratorStats
for _, input := range itr.inputs {
stats.Add(input.Stats())
}
return stats
}
2015-12-24 04:42:10 +00:00
// Close closes the underlying iterators.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}SortedMergeIterator) Close() error {
2015-11-04 21:06:06 +00:00
for _, input := range itr.inputs {
input.Close()
}
return nil
}
2015-12-24 04:42:10 +00:00
// Next returns the next points from the iterator.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}SortedMergeIterator) Next() (*{{ $ k . Name }}Point, error) { return itr.pop() }
2015-12-24 04:42:10 +00:00
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}SortedMergeIterator) pop() (*{{ $ k . Name }}Point, error) {
// Initialize the heap. See the MergeIterator to see why this has to be done lazily.
if !itr.init {
2016-11-23 20:32:42 +00:00
items := itr.heap.items
itr.heap.items = make([]*{{ $ k . name }}SortedMergeHeapItem, 0, len(items))
2016-04-17 20:00:59 +00:00
for _, item := range items {
var err error
if item.point, err = item.itr.Next(); err != nil {
return nil, err
} else if item.point == nil {
continue
}
2016-11-23 20:32:42 +00:00
itr.heap.items = append(itr.heap.items, item)
2016-04-17 20:00:59 +00:00
}
2016-11-23 20:32:42 +00:00
heap.Init(itr.heap)
2016-04-17 20:00:59 +00:00
itr.init = true
}
2016-11-23 20:32:42 +00:00
if len(itr.heap.items) == 0 {
2016-04-17 20:00:59 +00:00
return nil, nil
2015-12-24 04:42:10 +00:00
}
// Read the next item from the heap.
2016-11-23 20:32:42 +00:00
item := heap.Pop(itr.heap).(*{{ $ k . name }}SortedMergeHeapItem)
2016-04-17 20:00:59 +00:00
if item.err != nil {
return nil, item.err
2016-05-02 15:17:08 +00:00
} else if item.point == nil {
return nil, nil
2016-04-17 20:00:59 +00:00
}
2015-12-24 04:42:10 +00:00
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
2016-04-17 20:00:59 +00:00
if item.point, item.err = item.itr.Next(); item.point != nil {
2016-11-23 20:32:42 +00:00
heap.Push(itr.heap, item)
2015-12-24 04:42:10 +00:00
}
2016-04-17 20:00:59 +00:00
return p, nil
2015-12-24 04:42:10 +00:00
}
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}SortedMergeHeap represents a heap of {{ $ k . name }}SortedMergeHeapItems.
2017-08-14 18:07:49 +00:00
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
2016-11-23 20:32:42 +00:00
type {{ $ k . name }}SortedMergeHeap struct {
opt IteratorOptions
items []*{{ $ k . name }}SortedMergeHeapItem
}
2015-12-24 04:42:10 +00:00
2016-11-23 20:32:42 +00:00
func (h *{{ $ k . name }}SortedMergeHeap) Len() int { return len(h.items) }
func (h *{{ $ k . name }}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *{{ $ k . name }}SortedMergeHeap) Less(i, j int) bool {
x, y := h.items[i].point, h.items[j].point
2015-12-24 04:42:10 +00:00
2016-11-23 20:32:42 +00:00
if h.opt.Ascending {
2015-12-24 04:42:10 +00:00
if x.Name != y.Name {
return x.Name < y.Name
2016-11-23 20:32:42 +00:00
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
2015-12-24 04:42:10 +00:00
}
2017-08-14 18:07:49 +00:00
if x.Time != y.Time{
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
2015-12-24 04:42:10 +00:00
}
if x.Name != y.Name {
return x.Name > y.Name
2017-08-14 18:07:49 +00:00
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
2016-11-23 20:32:42 +00:00
return xTags.ID() > yTags.ID()
2015-12-24 04:42:10 +00:00
}
2017-08-14 18:07:49 +00:00
if x.Time != y.Time{
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
2015-12-24 04:42:10 +00:00
}
2016-03-02 23:42:00 +00:00
func (h *{{ $ k . name }}SortedMergeHeap) Push(x interface{}) {
2016-11-23 20:32:42 +00:00
h.items = append(h.items, x.(*{{ $ k . name }}SortedMergeHeapItem))
2015-12-24 04:42:10 +00:00
}
2016-03-02 23:42:00 +00:00
func (h *{{ $ k . name }}SortedMergeHeap) Pop() interface{} {
2016-11-23 20:32:42 +00:00
old := h.items
2015-12-24 04:42:10 +00:00
n := len(old)
item := old[n-1]
2016-11-23 20:32:42 +00:00
h.items = old[0 : n-1]
2015-12-24 04:42:10 +00:00
return item
}
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}SortedMergeHeapItem struct {
point *{{ $ k . Name }}Point
2016-04-17 20:00:59 +00:00
err error
2016-03-02 23:42:00 +00:00
itr {{ $ k . Name }}Iterator
2015-12-24 04:42:10 +00:00
}
2016-05-06 16:01:40 +00:00
// {{ $ k . name }}ParallelIterator represents an iterator that pulls data in a separate goroutine.
type {{ $ k . name }}ParallelIterator struct {
input {{ $ k . Name }}Iterator
ch chan {{ $ k . name }}PointError
2016-08-03 04:51:16 +00:00
2016-05-06 16:01:40 +00:00
once sync.Once
closing chan struct{}
2016-08-03 04:51:16 +00:00
wg sync.WaitGroup
2016-05-06 16:01:40 +00:00
}
// new{{ $ k . Name }}ParallelIterator returns a new instance of {{ $ k . name }}ParallelIterator.
func new{{ $ k . Name }}ParallelIterator(input {{ $ k . Name }}Iterator) *{{ $ k . name }}ParallelIterator {
itr := &{{ $ k . name }}ParallelIterator{
input: input,
2017-02-10 18:07:49 +00:00
ch: make(chan {{ $ k . name }}PointError, 256),
2016-08-03 04:51:16 +00:00
closing: make(chan struct{}),
2016-05-06 16:01:40 +00:00
}
2016-08-03 04:51:16 +00:00
itr.wg.Add(1)
2016-05-06 16:01:40 +00:00
go itr.monitor()
return itr
}
// Stats returns stats from the underlying iterator.
func (itr *{{ $ k . name }}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the underlying iterators.
func (itr *{{ $ k . name }}ParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
2016-08-03 04:51:16 +00:00
itr.wg.Wait()
return itr.input.Close()
2016-05-06 16:01:40 +00:00
}
// Next returns the next point from the iterator.
func (itr *{{ $ k . name }}ParallelIterator) Next() (*{{ $ k . Name }}Point, error) {
v, ok := <-itr.ch
if !ok {
return nil, io.EOF
}
return v.point, v.err
}
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *{{ $ k . name }}ParallelIterator) monitor() {
defer close(itr.ch)
2016-08-03 04:51:16 +00:00
defer itr.wg.Done()
2016-05-06 16:01:40 +00:00
for {
// Read next point.
p, err := itr.input.Next()
2017-02-10 18:07:49 +00:00
if p != nil {
p = p.Clone()
}
2016-06-01 17:30:50 +00:00
2016-05-06 16:01:40 +00:00
select {
case <-itr.closing:
return
case itr.ch <- {{ $ k . name }}PointError{point: p, err: err}:
}
}
}
type {{ $ k . name }}PointError struct {
point *{{ $ k . Name }}Point
err error
}
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}LimitIterator represents an iterator that limits points per group.
type {{ $ k . name }}LimitIterator struct {
input {{ $ k . Name }}Iterator
2015-12-24 18:46:31 +00:00
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
2016-03-02 23:42:00 +00:00
// new{{ $ k . Name }}LimitIterator returns a new instance of {{ $ k . name }}LimitIterator.
func new{{ $ k . Name }}LimitIterator(input {{ $ k . Name }}Iterator, opt IteratorOptions) *{{ $ k . name }}LimitIterator {
return &{{ $ k . name }}LimitIterator{
2015-12-24 18:46:31 +00:00
input: input,
opt: opt,
}
}
2016-03-17 15:55:37 +00:00
// Stats returns stats from the underlying iterator.
func (itr *{{ $ k . name }}LimitIterator) Stats() IteratorStats { return itr.input.Stats() }
2015-12-24 18:46:31 +00:00
// Close closes the underlying iterators.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}LimitIterator) Close() error { return itr.input.Close() }
2015-12-24 18:46:31 +00:00
// Next returns the next point from the iterator.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}LimitIterator) Next() (*{{ $ k . Name }}Point, error) {
2015-12-24 18:46:31 +00:00
for {
2016-04-17 20:00:59 +00:00
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
2015-12-24 18:46:31 +00:00
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
continue
}
2016-04-17 20:00:59 +00:00
return p, nil
2015-12-24 18:46:31 +00:00
}
}
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}FillIterator struct {
2016-03-07 18:25:45 +00:00
input *buf{{ $ k . Name }}Iterator
2016-06-27 15:36:37 +00:00
prev {{ $ k . Name }}Point
2016-03-07 18:25:45 +00:00
startTime int64
endTime int64
auxFields []interface{}
2016-04-17 20:00:59 +00:00
init bool
2016-03-07 18:25:45 +00:00
opt IteratorOptions
2016-02-09 01:39:44 +00:00
window struct {
2016-12-23 23:14:53 +00:00
name string
tags Tags
time int64
offset int64
2016-02-09 01:39:44 +00:00
}
2016-02-05 14:49:42 +00:00
}
2017-08-15 19:24:22 +00:00
func new{{ $ k . Name }}FillIterator(input {{ $ k . Name }}Iterator, expr influxql.Expr, opt IteratorOptions) *{{ $ k . name }}FillIterator {
if opt.Fill == influxql.NullFill {
if expr, ok := expr.(*influxql.Call); ok && expr.Name == "count" {
opt.Fill = influxql.NumberFill
2016-03-02 23:42:00 +00:00
opt.FillValue = {{ $ k . Zero }}
2016-02-05 14:49:42 +00:00
}
}
var startTime, endTime int64
if opt.Ascending {
startTime, _ = opt.Window(opt.StartTime)
2016-02-19 21:05:36 +00:00
endTime, _ = opt.Window(opt.EndTime)
2016-02-05 14:49:42 +00:00
} else {
2016-02-19 21:05:36 +00:00
startTime, _ = opt.Window(opt.EndTime)
2016-02-05 14:49:42 +00:00
endTime, _ = opt.Window(opt.StartTime)
}
2016-02-08 15:02:08 +00:00
var auxFields []interface{}
2016-02-09 01:39:44 +00:00
if len(opt.Aux) > 0 {
auxFields = make([]interface{}, len(opt.Aux))
2016-02-08 15:02:08 +00:00
}
2016-04-17 20:00:59 +00:00
return &{{ $ k . name }}FillIterator{
2016-03-07 18:25:45 +00:00
input: newBuf{{ $ k . Name }}Iterator(input),
2016-06-27 15:36:37 +00:00
prev: {{ $ k . Name }}Point{Nil: true},
2016-03-07 18:25:45 +00:00
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
2016-02-05 14:49:42 +00:00
}
}
2016-03-17 15:55:37 +00:00
func (itr *{{ $ k . name }}FillIterator) Stats() IteratorStats { return itr.input.Stats() }
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}FillIterator) Close() error { return itr.input.Close() }
2016-02-05 14:49:42 +00:00
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}FillIterator) Next() (*{{ $ k . Name }}Point, error) {
if !itr.init {
p, err := itr.input.peek()
if p == nil || err != nil {
return nil, err
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
2017-09-22 21:07:34 +00:00
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
2016-12-23 23:14:53 +00:00
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
2016-04-17 20:00:59 +00:00
itr.init = true
}
p, err := itr.input.Next()
if err != nil {
return nil, err
}
2016-02-09 01:39:44 +00:00
// Check if the next point is outside of our window or is nil.
for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
// If we are inside of an interval, unread the point and continue below to
// constructing a new point.
if itr.opt.Ascending {
2016-02-19 21:05:36 +00:00
if itr.window.time <= itr.endTime {
2016-02-09 01:39:44 +00:00
itr.input.unread(p)
p = nil
break
}
} else {
2017-09-22 21:07:34 +00:00
if itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
2016-02-09 01:39:44 +00:00
itr.input.unread(p)
p = nil
break
}
}
// We are *not* in a current interval. If there is no next point,
// we are at the end of all intervals.
if p == nil {
2016-04-17 20:00:59 +00:00
return nil, nil
2016-02-09 01:39:44 +00:00
}
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
2017-09-22 21:07:34 +00:00
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
2016-12-23 23:14:53 +00:00
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
2016-06-27 15:36:37 +00:00
itr.prev = {{ $ k . Name }}Point{Nil: true}
2016-02-09 01:39:44 +00:00
break
2016-02-05 14:49:42 +00:00
}
2016-02-09 01:39:44 +00:00
// Check if the point is our next expected point.
2016-02-19 21:05:36 +00:00
if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) {
2016-02-09 01:39:44 +00:00
if p != nil {
itr.input.unread(p)
}
2016-03-02 23:42:00 +00:00
p = &{{ $ k . Name }}Point{
2016-02-09 01:39:44 +00:00
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
2016-02-08 15:02:08 +00:00
Aux: itr.auxFields,
2016-02-05 14:49:42 +00:00
}
switch itr.opt.Fill {
2017-08-15 19:24:22 +00:00
case influxql.LinearFill:
2017-09-18 16:28:37 +00:00
{{- if or (eq $ k . Name "Float") (eq $ k . Name "Integer") (eq $ k . Name "Unsigned")}}
2016-09-28 22:05:56 +00:00
if !itr.prev.Nil {
next, err := itr.input.peek()
if err != nil {
return nil, err
2017-03-16 20:51:23 +00:00
} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
2016-09-28 22:05:56 +00:00
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linear{{ $ k . Name }}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
} else {
p.Nil = true
}
} else {
p.Nil = true
}
{{else}}
fallthrough
{{- end}}
2017-08-15 19:24:22 +00:00
case influxql.NullFill:
2016-02-05 14:49:42 +00:00
p.Nil = true
2017-08-15 19:24:22 +00:00
case influxql.NumberFill:
2016-03-02 23:42:00 +00:00
p.Value = castTo{{ $ k . Name }}(itr.opt.FillValue)
2017-08-15 19:24:22 +00:00
case influxql.PreviousFill:
2016-06-27 15:36:37 +00:00
if !itr.prev.Nil {
2016-02-05 14:49:42 +00:00
p.Value = itr.prev.Value
p.Nil = itr.prev.Nil
} else {
p.Nil = true
}
}
} else {
2016-06-27 15:36:37 +00:00
itr.prev = *p
2016-02-05 14:49:42 +00:00
}
2016-02-09 01:39:44 +00:00
// Advance the expected time. Do not advance to a new window here
// as there may be lingering points with the same timestamp in the previous
// window.
2016-02-05 14:49:42 +00:00
if itr.opt.Ascending {
2016-12-23 23:14:53 +00:00
itr.window.time += int64(itr.opt.Interval.Duration)
2016-02-05 14:49:42 +00:00
} else {
2016-12-23 23:14:53 +00:00
itr.window.time -= int64(itr.opt.Interval.Duration)
}
// Check to see if we have passed over an offset change and adjust the time
// to account for this new offset.
if itr.opt.Location != nil {
2017-07-19 06:23:23 +00:00
if _, offset := itr.opt.Zone(itr.window.time - 1); offset != itr.window.offset {
2016-12-23 23:14:53 +00:00
diff := itr.window.offset - offset
if abs(diff) < int64(itr.opt.Interval.Duration) {
itr.window.time += diff
}
itr.window.offset = offset
}
2016-02-05 14:49:42 +00:00
}
2016-04-17 20:00:59 +00:00
return p, nil
2016-02-05 14:49:42 +00:00
}
2016-03-04 01:53:45 +00:00
// {{ $ k . name }}IntervalIterator represents a {{ $ k . name }} implementation of IntervalIterator.
type {{ $ k . name }}IntervalIterator struct {
input {{ $ k . Name }}Iterator
opt IteratorOptions
}
func new{{ $ k . Name }}IntervalIterator(input {{ $ k . Name }}Iterator, opt IteratorOptions) *{{ $ k . name }}IntervalIterator {
return &{{ $ k . name }}IntervalIterator{input: input, opt: opt}
}
2016-03-17 15:55:37 +00:00
func (itr *{{ $ k . name }}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() }
2016-03-04 01:53:45 +00:00
func (itr *{{ $ k . name }}IntervalIterator) Close() error { return itr.input.Close() }
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}IntervalIterator) Next() (*{{ $ k . Name }}Point, error) {
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
2016-03-04 01:53:45 +00:00
}
p.Time, _ = itr.opt.Window(p.Time)
2016-08-25 17:52:39 +00:00
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
2017-08-15 19:24:22 +00:00
if p.Time == influxql.MinTime {
2016-08-25 17:52:39 +00:00
p.Time = 0
}
2016-04-17 20:00:59 +00:00
return p, nil
2016-03-04 01:53:45 +00:00
}
2016-03-09 14:36:18 +00:00
// {{ $ k . name }}InterruptIterator represents a {{ $ k . name }} implementation of InterruptIterator.
type {{ $ k . name }}InterruptIterator struct {
input {{ $ k . Name }}Iterator
closing <-chan struct{}
count int
}
func new{{ $ k . Name }}InterruptIterator(input {{ $ k . Name }}Iterator, closing <-chan struct{}) *{{ $ k . name }}InterruptIterator {
return &{{ $ k . name }}InterruptIterator{input: input, closing: closing}
}
2016-03-21 22:38:18 +00:00
func (itr *{{ $ k . name }}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() }
2016-03-09 14:36:18 +00:00
func (itr *{{ $ k . name }}InterruptIterator) Close() error { return itr.input.Close() }
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}InterruptIterator) Next() (*{{ $ k . Name }}Point, error) {
2016-04-25 15:12:55 +00:00
// Only check if the channel is closed every N points. This
// intentionally checks on both 0 and N so that if the iterator
2016-03-09 14:36:18 +00:00
// has been interrupted before the first point is emitted it will
// not emit any points.
2016-04-25 15:12:55 +00:00
if itr.count & 0xFF == 0xFF {
2016-03-09 14:36:18 +00:00
select {
case <-itr.closing:
2017-03-17 21:56:23 +00:00
return nil, itr.Close()
2016-03-09 14:36:18 +00:00
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
2016-06-01 17:30:50 +00:00
// {{ $ k . name }}CloseInterruptIterator represents a {{ $ k . name }} implementation of CloseInterruptIterator.
type {{ $ k . name }}CloseInterruptIterator struct {
input {{ $ k . Name }}Iterator
closing <-chan struct{}
done chan struct{}
once sync.Once
}
func new{{ $ k . Name }}CloseInterruptIterator(input {{ $ k . Name }}Iterator, closing <-chan struct{}) *{{ $ k . name }}CloseInterruptIterator {
itr := &{{ $ k . name }}CloseInterruptIterator{
input: input,
closing: closing,
done: make(chan struct{}),
}
go itr.monitor()
return itr
}
func (itr *{{ $ k . name }}CloseInterruptIterator) monitor() {
select {
case <-itr.closing:
itr.Close()
case <-itr.done:
}
}
func (itr *{{ $ k . name }}CloseInterruptIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *{{ $ k . name }}CloseInterruptIterator) Close() error {
itr.once.Do(func() {
close(itr.done)
itr.input.Close()
})
return nil
}
func (itr *{{ $ k . name }}CloseInterruptIterator) Next() (*{{ $ k . Name }}Point, error) {
p, err := itr.input.Next()
if err != nil {
// Check if the iterator was closed.
select {
case <-itr.done:
return nil, nil
default:
return nil, err
}
}
return p, nil
}
2016-04-17 20:00:59 +00:00
// aux{{ $ k . Name }}Point represents a combination of a point and an error for the AuxIterator.
type aux{{ $ k . Name }}Point struct {
point *{{ $ k . Name }}Point
err error
}
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}AuxIterator represents a {{ $ k . name }} implementation of AuxIterator.
type {{ $ k . name }}AuxIterator struct {
2016-03-13 16:05:47 +00:00
input *buf{{ $ k . Name }}Iterator
2016-04-17 20:00:59 +00:00
output chan aux{{ $ k . Name }}Point
2016-11-23 20:32:42 +00:00
fields *auxIteratorFields
2016-03-13 16:05:47 +00:00
background bool
2017-10-25 20:08:55 +00:00
closer sync.Once
2015-11-04 21:06:06 +00:00
}
2016-05-16 16:08:28 +00:00
func new{{ $ k . Name }}AuxIterator(input {{ $ k . Name }}Iterator, opt IteratorOptions) *{{ $ k . name }}AuxIterator {
2016-03-02 23:42:00 +00:00
return &{{ $ k . name }}AuxIterator{
input: newBuf{{ $ k . Name }}Iterator(input),
2016-04-17 20:00:59 +00:00
output: make(chan aux{{ $ k . Name }}Point, 1),
2016-05-16 16:08:28 +00:00
fields: newAuxIteratorFields(opt),
2015-11-04 21:06:06 +00:00
}
}
2016-03-13 16:05:47 +00:00
func (itr *{{ $ k . name }}AuxIterator) Background() {
itr.background = true
itr.Start()
2016-04-04 16:53:39 +00:00
go DrainIterator(itr)
2016-03-13 16:05:47 +00:00
}
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}AuxIterator) Start() { go itr.stream() }
func (itr *{{ $ k . name }}AuxIterator) Stats() IteratorStats { return itr.input.Stats() }
2017-10-25 20:08:55 +00:00
func (itr *{{ $ k . name }}AuxIterator) Close() error {
var err error
itr.closer.Do(func() { err = itr.input.Close() })
return err
}
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}AuxIterator) Next() (*{{ $ k . Name }}Point, error) {
p := <-itr.output
return p.point, p.err
}
2017-08-15 19:24:22 +00:00
func (itr *{{ $ k . name }}AuxIterator) Iterator(name string, typ influxql.DataType) Iterator { return itr.fields.iterator(name, typ) }
2015-11-04 21:06:06 +00:00
2016-03-04 18:01:41 +00:00
func (itr *{{.name}}AuxIterator) stream() {
2015-11-04 21:06:06 +00:00
for {
// Read next point.
2016-04-17 20:00:59 +00:00
p, err := itr.input.Next()
if err != nil {
itr.output <- aux{{ $ k . Name }}Point{err: err}
itr.fields.sendError(err)
break
} else if p == nil {
2015-11-04 21:06:06 +00:00
break
}
// Send point to output and to each field iterator.
2016-04-17 20:00:59 +00:00
itr.output <- aux{{ $ k . Name }}Point{point: p}
2016-03-13 16:05:47 +00:00
if ok := itr.fields.send(p); !ok && itr.background {
break
}
2015-11-04 21:06:06 +00:00
}
close(itr.output)
itr.fields.close()
}
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}ChanIterator represents a new instance of {{ $ k . name }}ChanIterator.
type {{ $ k . name }}ChanIterator struct {
2016-04-14 17:08:41 +00:00
buf struct {
i int
filled bool
points [2]{{ $ k . Name }}Point
}
2016-04-17 20:00:59 +00:00
err error
2016-03-10 22:51:54 +00:00
cond *sync.Cond
done bool
2015-11-04 21:06:06 +00:00
}
2016-03-17 15:55:37 +00:00
func (itr *{{ $ k . name }}ChanIterator) Stats() IteratorStats { return IteratorStats{} }
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}ChanIterator) Close() error {
2016-03-10 22:51:54 +00:00
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
2015-11-04 21:06:06 +00:00
return nil
}
2016-03-10 22:51:54 +00:00
func (itr *{{ $ k . name }}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
2016-04-14 17:08:41 +00:00
for !itr.done && itr.buf.filled {
2016-03-10 22:51:54 +00:00
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case {{ $ k . Type }}:
2016-04-14 17:08:41 +00:00
itr.buf.points[itr.buf.i] = {{ $ k . Name }}Point{Name: name, Tags: tags, Time: time, Value: v}
2016-03-10 22:51:54 +00:00
{{if eq $ k . Name "Float"}}
case int64:
2016-04-14 17:08:41 +00:00
itr.buf.points[itr.buf.i] = {{ $ k . Name }}Point{Name: name, Tags: tags, Time: time, Value: float64(v)}
2016-03-10 22:51:54 +00:00
{{end}}
default:
2016-04-14 17:08:41 +00:00
itr.buf.points[itr.buf.i] = {{ $ k . Name }}Point{Name: name, Tags: tags, Time: time, Nil: true}
2016-03-10 22:51:54 +00:00
}
2016-04-14 17:08:41 +00:00
itr.buf.filled = true
2016-03-10 22:51:54 +00:00
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}ChanIterator) setErr(err error) {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
itr.err = err
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
}
func (itr *{{ $ k . name }}ChanIterator) Next() (*{{ $ k . Name }}Point, error) {
2016-03-10 22:51:54 +00:00
itr.cond.L.Lock()
2016-04-17 20:00:59 +00:00
defer itr.cond.L.Unlock()
// Check for an error and return one if there.
if itr.err != nil {
return nil, itr.err
}
2016-03-10 22:51:54 +00:00
// Wait until either a value is available in the buffer or
// the iterator is closed.
2016-04-14 17:08:41 +00:00
for !itr.done && !itr.buf.filled {
2016-03-10 22:51:54 +00:00
itr.cond.Wait()
}
2016-04-14 17:08:41 +00:00
// Return nil once the channel is done and the buffer is empty.
if itr.done && !itr.buf.filled {
2016-04-17 20:00:59 +00:00
return nil, nil
2016-04-14 17:08:41 +00:00
}
2016-03-10 22:51:54 +00:00
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
2016-04-14 17:08:41 +00:00
p := &itr.buf.points[itr.buf.i]
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
itr.buf.filled = false
2016-03-10 22:51:54 +00:00
itr.cond.Signal()
2016-04-17 20:00:59 +00:00
return p, nil
2016-03-10 22:51:54 +00:00
}
2016-03-02 23:42:00 +00:00
{{range $v := $ types }}
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}Reduce{{ $ v . Name }}Iterator executes a reducer for every interval and buffers the result.
type {{ $ k . name }}Reduce{{ $ v . Name }}Iterator struct {
2016-02-19 21:05:36 +00:00
input *buf{{ $ k . Name }}Iterator
create func() ({{ $ k . Name }}PointAggregator, {{ $ v . Name }}PointEmitter)
2016-11-23 20:32:42 +00:00
dims []string
2016-02-19 21:05:36 +00:00
opt IteratorOptions
points []{{ $ v . Name }}Point
2017-05-17 16:10:51 +00:00
keepTags bool
2015-11-04 21:06:06 +00:00
}
2016-11-23 20:32:42 +00:00
func new{{ $ k . Name }}Reduce{{ $ v . Name }}Iterator(input {{ $ k . Name }}Iterator, opt IteratorOptions, createFn func() ({{ $ k . Name }}PointAggregator, {{ $ v . Name }}PointEmitter)) *{{ $ k . name }}Reduce{{ $ v . Name }}Iterator {
return &{{ $ k . name }}Reduce{{ $ v . Name }}Iterator{
input: newBuf{{ $ k . Name }}Iterator(input),
create: createFn,
dims: opt.GetDimensions(),
opt: opt,
}
}
2016-03-17 15:55:37 +00:00
// Stats returns stats from the input iterator.
func (itr *{{ $ k . name }}Reduce{{ $ v . Name }}Iterator) Stats() IteratorStats { return itr.input.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the iterator and all child iterators.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}Reduce{{ $ v . Name }}Iterator) Close() error { return itr.input.Close() }
2015-11-04 21:06:06 +00:00
// Next returns the minimum value for the next available interval.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}Reduce{{ $ v . Name }}Iterator) Next() (*{{ $ v . Name }}Point, error) {
2015-11-04 21:06:06 +00:00
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
2016-04-17 20:00:59 +00:00
var err error
itr.points, err = itr.reduce()
2015-11-04 21:06:06 +00:00
if len(itr.points) == 0 {
2016-04-17 20:00:59 +00:00
return nil, err
2015-11-04 21:06:06 +00:00
}
}
// Pop next point off the stack.
2016-03-07 18:25:45 +00:00
p := &itr.points[len(itr.points)-1]
2015-11-04 21:06:06 +00:00
itr.points = itr.points[:len(itr.points)-1]
2016-04-17 20:00:59 +00:00
return p, nil
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}Reduce{{ $ v . Name }}Point stores the reduced data for a name/tag combination.
type {{ $ k . name }}Reduce{{ $ v . Name }}Point struct {
2016-03-02 20:52:03 +00:00
Name string
Tags Tags
2016-03-02 23:42:00 +00:00
Aggregator {{ $ k . Name }}PointAggregator
Emitter {{ $ v . Name }}PointEmitter
2016-03-02 20:52:03 +00:00
}
2015-11-04 21:06:06 +00:00
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}Reduce{{ $ v . Name }}Iterator) reduce() ([]{{ $ v . Name }}Point, error) {
2015-11-04 21:06:06 +00:00
// Calculate next window.
2017-04-11 15:30:06 +00:00
var (
startTime, endTime int64
window struct {
name string
tags string
}
)
2016-11-23 20:32:42 +00:00
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
} else if p.Nil {
continue
}
// Unread the point so it can be processed.
itr.input.unread(p)
startTime, endTime = itr.opt.Window(p.Time)
2017-04-11 15:30:06 +00:00
window.name, window.tags = p.Name, p.Tags.Subset(itr.opt.Dimensions).ID()
2016-11-23 20:32:42 +00:00
break
2016-04-17 20:00:59 +00:00
}
2015-11-04 21:06:06 +00:00
// Create points by tags.
2016-03-24 14:40:55 +00:00
m := make(map[string]*{{ $ k . name }}Reduce{{ $ v . Name }}Point)
2015-11-04 21:06:06 +00:00
for {
// Read next point.
2016-04-17 20:00:59 +00:00
curr, err := itr.input.NextInWindow(startTime, endTime)
if err != nil {
return nil, err
} else if curr == nil {
2015-11-04 21:06:06 +00:00
break
2016-01-25 16:28:03 +00:00
} else if curr.Nil {
continue
2017-04-11 15:30:06 +00:00
} else if curr.Name != window.name {
itr.input.unread(curr)
break
2015-11-04 21:06:06 +00:00
}
2016-03-24 14:40:55 +00:00
2017-04-11 15:30:06 +00:00
// Ensure this point is within the same final window.
if curr.Name != window.name {
itr.input.unread(curr)
break
} else if tags := curr.Tags.Subset(itr.opt.Dimensions); tags.ID() != window.tags {
itr.input.unread(curr)
break
2016-03-16 20:03:53 +00:00
}
2015-11-04 21:06:06 +00:00
2017-04-11 15:30:06 +00:00
// Retrieve the tags on this point for this level of the query.
// This may be different than the bucket dimensions.
tags := curr.Tags.Subset(itr.dims)
id := tags.ID()
2016-03-02 20:52:03 +00:00
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
if rp == nil {
aggregator, emitter := itr.create()
2016-11-23 20:32:42 +00:00
rp = &{{ $ k . name }}Reduce{{ $ v . Name }}Point{
2016-03-02 20:52:03 +00:00
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
m[id] = rp
2015-11-04 21:06:06 +00:00
}
2016-03-07 18:25:45 +00:00
rp.Aggregator.Aggregate{{ $ k . Name }}(curr)
2015-11-04 21:06:06 +00:00
}
2017-04-11 15:30:06 +00:00
// Reverse sort points by name & tag if our output is supposed to be ordered.
2015-11-04 21:06:06 +00:00
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
2017-04-11 15:30:06 +00:00
if len(keys) > 1 && itr.opt.Ordered {
2016-04-04 16:53:39 +00:00
sort.Sort(reverseStringSlice(keys))
}
2015-11-04 21:06:06 +00:00
2016-11-23 20:32:42 +00:00
// Assume the points are already sorted until proven otherwise.
sortedByTime := true
2017-04-11 15:30:06 +00:00
// Emit the points for each name & tag combination.
2016-03-07 18:25:45 +00:00
a := make([]{{ $ v . Name }}Point, 0, len(m))
for _, k := range keys {
2016-03-02 20:52:03 +00:00
rp := m[k]
2016-03-07 18:25:45 +00:00
points := rp.Emitter.Emit()
for i := len(points)-1; i >= 0; i-- {
points[i].Name = rp.Name
2017-05-17 16:10:51 +00:00
if !itr.keepTags {
points[i].Tags = rp.Tags
}
2016-03-07 18:25:45 +00:00
// Set the points time to the interval time if the reducer didn't provide one.
if points[i].Time == ZeroTime {
points[i].Time = startTime
2016-11-23 20:32:42 +00:00
} else {
sortedByTime = false
2016-03-07 18:25:45 +00:00
}
a = append(a, points[i])
2016-03-04 01:53:45 +00:00
}
2016-01-23 03:04:10 +00:00
}
2016-11-23 20:32:42 +00:00
// Points may be out of order. Perform a stable sort by time if requested.
if !sortedByTime && itr.opt.Ordered {
sort.Stable(sort.Reverse({{ $ v . name }}PointsByTime(a)))
}
2016-04-17 20:00:59 +00:00
return a, nil
2015-11-04 21:06:06 +00:00
}
2016-03-14 17:26:38 +00:00
2016-02-19 21:05:36 +00:00
// {{ $ k . name }}Stream{{ $ v . Name }}Iterator streams inputs into the iterator and emits points gradually.
2016-03-24 14:40:55 +00:00
type {{ $ k . name }}Stream{{ $ v . Name }}Iterator struct {
input *buf{{ $ k . Name }}Iterator
create func() ({{ $ k . Name }}PointAggregator, {{ $ v . Name }}PointEmitter)
2016-11-23 20:32:42 +00:00
dims []string
2016-03-24 14:40:55 +00:00
opt IteratorOptions
m map[string]*{{ $ k . name }}Reduce{{ $ v . Name }}Point
points []{{ $ v . Name }}Point
}
2016-02-19 21:05:36 +00:00
// new{{ $ k . Name }}Stream{{ $ v . Name }}Iterator returns a new instance of {{ $ k . name }}Stream{{ $ v . Name }}Iterator.
2016-03-24 14:40:55 +00:00
func new{{ $ k . Name }}Stream{{ $ v . Name }}Iterator(input {{ $ k . Name }}Iterator, createFn func() ({{ $ k . Name }}PointAggregator, {{ $ v . Name }}PointEmitter), opt IteratorOptions) *{{ $ k . name }}Stream{{ $ v . Name }}Iterator {
return &{{ $ k . name }}Stream{{ $ v . Name }}Iterator{
input: newBuf{{ $ k . Name }}Iterator(input),
create: createFn,
2016-11-23 20:32:42 +00:00
dims: opt.GetDimensions(),
2016-03-24 14:40:55 +00:00
opt: opt,
m: make(map[string]*{{ $ k . name }}Reduce{{ $ v . Name }}Point),
}
}
// Stats returns stats from the input iterator.
func (itr *{{ $ k . name }}Stream{{ $ v . Name }}Iterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the iterator and all child iterators.
func (itr *{{ $ k . name }}Stream{{ $ v . Name }}Iterator) Close() error { return itr.input.Close() }
// Next returns the next value for the stream iterator.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}Stream{{ $ v . Name }}Iterator) Next() (*{{ $ v . Name }}Point, error) {
2016-03-24 14:40:55 +00:00
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
2016-04-17 20:00:59 +00:00
var err error
itr.points, err = itr.reduce()
2016-03-24 14:40:55 +00:00
if len(itr.points) == 0 {
2016-04-17 20:00:59 +00:00
return nil, err
2016-03-24 14:40:55 +00:00
}
}
// Pop next point off the stack.
p := &itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
2016-04-17 20:00:59 +00:00
return p, nil
2016-03-24 14:40:55 +00:00
}
// reduce creates and manages aggregators for every point from the input.
// After aggregating a point, it always tries to emit a value using the emitter.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}Stream{{ $ v . Name }}Iterator) reduce() ([]{{ $ v . Name }}Point, error) {
2016-03-24 14:40:55 +00:00
for {
// Read next point.
2016-04-17 20:00:59 +00:00
curr, err := itr.input.Next()
2017-03-23 20:18:03 +00:00
if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []{{ $ v . Name }}Point
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if err != nil {
2016-04-17 20:00:59 +00:00
return nil, err
2016-03-24 14:40:55 +00:00
} else if curr.Nil {
continue
}
2016-11-23 20:32:42 +00:00
tags := curr.Tags.Subset(itr.dims)
2016-03-24 14:40:55 +00:00
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := itr.m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &{{ $ k . name }}Reduce{{.Name}}Point{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
itr.m[id] = rp
}
rp.Aggregator.Aggregate{{ $ k . Name }}(curr)
// Attempt to emit points from the aggregator.
points := rp.Emitter.Emit()
if len(points) == 0 {
continue
}
for i := range points {
points[i].Name = rp.Name
points[i].Tags = rp.Tags
}
2016-04-17 20:00:59 +00:00
return points, nil
2016-03-24 14:40:55 +00:00
}
}
2016-03-14 17:26:38 +00:00
// {{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator executes a function to modify an existing point
// for every output of the input iterator.
type {{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator struct {
2016-10-18 18:30:59 +00:00
left *buf{{ $ k . Name }}Iterator
right *buf{{ $ k . Name }}Iterator
fn {{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprFunc
points []{{ $ k . Name }}Point // must be size 2
storePrev bool
}
func new{{ $ k . Name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator(left, right {{ $ k . Name }}Iterator, opt IteratorOptions, fn func(a, b {{ $ k . Type }}) {{ $ v . Type }}) *{{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator {
var points []{{ $ k . Name }}Point
switch opt.Fill {
2017-08-15 19:24:22 +00:00
case influxql.NullFill, influxql.PreviousFill:
2016-10-18 18:30:59 +00:00
points = []{{ $ k . Name }}Point{ {Nil: true}, {Nil: true} }
2017-08-15 19:24:22 +00:00
case influxql.NumberFill:
2016-10-18 18:30:59 +00:00
value := castTo{{ $ k . Name }}(opt.FillValue)
points = []{{ $ k . Name }}Point{ {Value: value}, {Value: value} }
}
return &{{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator{
left: newBuf{{ $ k . Name }}Iterator(left),
right: newBuf{{ $ k . Name }}Iterator(right),
points: points,
fn: fn,
2017-08-15 19:24:22 +00:00
storePrev: opt.Fill == influxql.PreviousFill,
2016-10-18 18:30:59 +00:00
}
2016-03-14 17:26:38 +00:00
}
2016-03-17 15:55:37 +00:00
func (itr *{{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator) Stats() IteratorStats {
stats := itr.left.Stats()
stats.Add(itr.right.Stats())
return stats
}
2016-03-14 17:26:38 +00:00
func (itr *{{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator) Close() error {
itr.left.Close()
itr.right.Close()
return nil
}
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator) Next() (*{{ $ v . Name }}Point, error) {
2016-10-18 18:30:59 +00:00
for {
2017-02-02 19:30:09 +00:00
a, b, err := itr.next()
if err != nil || (a == nil && b == nil) {
2016-10-18 18:30:59 +00:00
return nil, err
}
2017-02-23 21:40:23 +00:00
// If any of these are nil and we are using fill(none), skip these points.
if (a == nil || a.Nil || b == nil || b.Nil) && itr.points == nil {
continue
}
2017-01-31 19:59:26 +00:00
2017-02-23 21:40:23 +00:00
// If one of the two points is nil, we need to fill it with a fake nil
// point that has the same name, tags, and time as the other point.
// There should never be a time when both of these are nil.
if a == nil {
p := *b
2016-10-18 18:30:59 +00:00
a = &p
2017-02-23 21:40:23 +00:00
a.Value = {{ $ k . Nil }}
a.Nil = true
} else if b == nil {
2016-10-18 18:30:59 +00:00
p := *a
b = &p
2017-02-23 21:40:23 +00:00
b.Value = {{ $ k . Nil }}
b.Nil = true
}
// If a value is nil, use the fill values if the fill value is non-nil.
if a.Nil && !itr.points[0].Nil {
a.Value = itr.points[0].Value
a.Nil = false
}
if b.Nil && !itr.points[1].Nil {
b.Value = itr.points[1].Value
b.Nil = false
2016-10-18 18:30:59 +00:00
}
if itr.storePrev {
itr.points[0], itr.points[1] = *a, *b
}
{{if eq $ k . Name $ v . Name }}
if a.Nil {
return a, nil
} else if b.Nil {
return b, nil
}
a.Value = itr.fn(a.Value, b.Value)
return a, nil
{{else}}
p := &{{ $ v . Name }}Point{
Name: a.Name,
Tags: a.Tags,
Time: a.Time,
Nil: a.Nil || b.Nil,
Aggregated: a.Aggregated,
}
if !p.Nil {
p.Value = itr.fn(a.Value, b.Value)
}
return p, nil
{{end}}
2016-04-17 20:00:59 +00:00
}
2016-03-14 17:26:38 +00:00
}
2017-02-02 19:30:09 +00:00
// next returns the next points within each iterator. If the iterators are
// uneven, it organizes them so only matching points are returned.
func (itr *{{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprIterator) next() (a, b *{{ $ k . Name }}Point, err error) {
// Retrieve the next value for both the left and right.
a, err = itr.left.Next()
if err != nil {
return nil, nil, err
}
b, err = itr.right.Next()
if err != nil {
return nil, nil, err
}
// If we have a point from both, make sure that they match each other.
if a != nil && b != nil {
if a.Name > b.Name {
itr.left.unread(a)
return nil, b, nil
} else if a.Name < b.Name {
itr.right.unread(b)
return a, nil, nil
}
if ltags, rtags := a.Tags.ID(), b.Tags.ID(); ltags > rtags {
itr.left.unread(a)
return nil, b, nil
} else if ltags < rtags {
itr.right.unread(b)
return a, nil, nil
}
if a.Time > b.Time {
itr.left.unread(a)
return nil, b, nil
} else if a.Time < b.Time {
itr.right.unread(b)
return a, nil, nil
}
}
return a, b, nil
}
2016-03-14 17:26:38 +00:00
// {{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprFunc creates or modifies a point by combining two
// points. The point passed in may be modified and returned rather than
// allocating a new point if possible. One of the points may be nil, but at
// least one of the points will be non-nil.
2016-10-18 18:30:59 +00:00
type {{ $ k . name }}{{if ne $ k . Name $ v . Name }}{{ $ v . Name }}{{end}}ExprFunc func(a, b {{ $ k . Type }}) {{ $ v . Type }}
2016-03-02 23:42:00 +00:00
{{end}}
2015-11-04 21:06:06 +00:00
2016-03-07 18:25:45 +00:00
// {{ $ k . name }}TransformIterator executes a function to modify an existing point for every
2015-11-04 21:06:06 +00:00
// output of the input iterator.
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}TransformIterator struct {
input {{ $ k . Name }}Iterator
fn {{ $ k . name }}TransformFunc
2015-11-04 21:06:06 +00:00
}
2016-03-17 15:55:37 +00:00
// Stats returns stats from the input iterator.
func (itr *{{ $ k . name }}TransformIterator) Stats() IteratorStats { return itr.input.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the iterator and all child iterators.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}TransformIterator) Close() error { return itr.input.Close() }
2015-11-04 21:06:06 +00:00
// Next returns the minimum value for the next available interval.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}TransformIterator) Next() (*{{ $ k . Name }}Point, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p != nil {
2015-11-04 21:06:06 +00:00
p = itr.fn(p)
}
2016-04-17 20:00:59 +00:00
return p, nil
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}TransformFunc creates or modifies a point.
2015-11-04 21:06:06 +00:00
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}TransformFunc func(p *{{ $ k . Name }}Point) *{{ $ k . Name }}Point
2015-11-04 21:06:06 +00:00
2016-04-06 17:27:08 +00:00
// {{ $ k . name }}BoolTransformIterator executes a function to modify an existing point for every
2015-11-04 21:06:06 +00:00
// output of the input iterator.
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}BoolTransformIterator struct {
input {{ $ k . Name }}Iterator
fn {{ $ k . name }}BoolTransformFunc
2015-11-04 21:06:06 +00:00
}
2016-03-17 15:55:37 +00:00
// Stats returns stats from the input iterator.
func (itr *{{ $ k . name }}BoolTransformIterator) Stats() IteratorStats { return itr.input.Stats() }
2015-11-04 21:06:06 +00:00
// Close closes the iterator and all child iterators.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}BoolTransformIterator) Close() error { return itr.input.Close() }
2015-11-04 21:06:06 +00:00
// Next returns the minimum value for the next available interval.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}BoolTransformIterator) Next() (*BooleanPoint, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p != nil {
return itr.fn(p), nil
2015-11-04 21:06:06 +00:00
}
2016-04-17 20:00:59 +00:00
return nil, nil
2015-11-04 21:06:06 +00:00
}
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}BoolTransformFunc creates or modifies a point.
2015-11-04 21:06:06 +00:00
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}BoolTransformFunc func(p *{{ $ k . Name }}Point) *BooleanPoint
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}DedupeIterator only outputs unique points.
2016-02-05 17:23:35 +00:00
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
2016-03-02 23:42:00 +00:00
type {{ $ k . name }}DedupeIterator struct {
2016-03-07 18:25:45 +00:00
input {{ $ k . Name }}Iterator
m map[string]struct{} // lookup of points already sent
2016-02-05 17:23:35 +00:00
}
2017-02-28 21:40:43 +00:00
type {{ $ k . name }}IteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point {{ $ k . Name }}Point
}
func new{{ $ k . Name }}IteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{ $ k . name }}IteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &{{ $ k . name }}IteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: {{ $ k . Name }}Point{
Aux: make([]interface{}, len(fields)),
},
}
}
func (itr *{{ $ k . name }}IteratorMapper) Next() (*{{ $ k . Name }}Point, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags
itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.({{ $ k . Type }}); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = {{ $ k . Nil }}
itr.point.Nil = true
}
} else {
itr.point.Value = {{ $ k . Nil }}
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}
func (itr *{{ $ k . name }}IteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}
func (itr *{{ $ k . name }}IteratorMapper) Close() error {
return itr.e.Close()
}
2017-01-20 17:04:35 +00:00
type {{ $ k . name }}FilterIterator struct {
input {{ $ k . Name }}Iterator
2017-08-15 19:24:22 +00:00
cond influxql.Expr
2017-01-20 17:04:35 +00:00
opt IteratorOptions
m map[string]interface{}
}
2017-08-15 19:24:22 +00:00
func new{{ $ k . Name }}FilterIterator(input {{ $ k . Name }}Iterator, cond influxql.Expr, opt IteratorOptions) {{ $ k . Name }}Iterator {
2017-01-20 17:04:35 +00:00
// Strip out time conditions from the WHERE clause.
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
2017-08-15 19:24:22 +00:00
n := influxql.RewriteFunc(influxql.CloneExpr(cond), func(n influxql.Node) influxql.Node {
2017-01-20 17:04:35 +00:00
switch n := n.(type) {
2017-08-15 19:24:22 +00:00
case *influxql.BinaryExpr:
2017-01-20 17:04:35 +00:00
if n.LHS.String() == "time" {
2017-08-15 19:24:22 +00:00
return &influxql.BooleanLiteral{Val: true}
2017-01-20 17:04:35 +00:00
}
}
return n
})
2017-08-15 19:24:22 +00:00
cond, _ = n.(influxql.Expr)
2017-01-20 17:04:35 +00:00
if cond == nil {
return input
2017-08-15 19:24:22 +00:00
} else if n, ok := cond.(*influxql.BooleanLiteral); ok && n.Val {
2017-01-20 17:04:35 +00:00
return input
}
return &{{ $ k . name }}FilterIterator{
input: input,
cond: cond,
opt: opt,
m: make(map[string]interface{}),
}
}
func (itr *{{ $ k . name }}FilterIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{ $ k . name }}FilterIterator) Close() error { return itr.input.Close() }
func (itr *{{ $ k . name }}FilterIterator) Next() (*{{ $ k . Name }}Point, error) {
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
}
for i, ref := range itr.opt.Aux {
itr.m[ref.Val] = p.Aux[i]
}
for k, v := range p.Tags.KeyValues() {
itr.m[k] = v
}
2017-08-15 19:24:22 +00:00
if !influxql.EvalBool(itr.cond, itr.m) {
2017-01-20 17:04:35 +00:00
continue
}
return p, nil
}
}
2016-03-02 23:42:00 +00:00
// new{{ $ k . Name }}DedupeIterator returns a new instance of {{ $ k . name }}DedupeIterator.
func new{{ $ k . Name }}DedupeIterator(input {{ $ k . Name }}Iterator) *{{ $ k . name }}DedupeIterator {
return &{{ $ k . name }}DedupeIterator{
2016-02-05 17:23:35 +00:00
input: input,
2016-03-07 18:25:45 +00:00
m: make(map[string]struct{}),
2016-02-05 17:23:35 +00:00
}
}
2016-03-17 15:55:37 +00:00
// Stats returns stats from the input iterator.
func (itr *{{ $ k . name }}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() }
2016-02-05 17:23:35 +00:00
// Close closes the iterator and all child iterators.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}DedupeIterator) Close() error { return itr.input.Close() }
2016-02-05 17:23:35 +00:00
// Next returns the next unique point from the input iterator.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}DedupeIterator) Next() (*{{ $ k . Name }}Point, error) {
2016-02-05 17:23:35 +00:00
for {
// Read next point.
2016-04-17 20:00:59 +00:00
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
2016-02-05 17:23:35 +00:00
}
// Serialize to bytes to store in lookup.
2016-03-02 23:42:00 +00:00
buf, err := proto.Marshal(encode{{ $ k . Name }}Point(p))
2016-02-05 17:23:35 +00:00
if err != nil {
2016-04-17 20:00:59 +00:00
return nil, err
2016-02-05 17:23:35 +00:00
}
// If the point has already been output then move to the next point.
if _, ok := itr.m[string(buf)]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[string(buf)] = struct{}{}
2016-04-17 20:00:59 +00:00
return p, nil
2016-02-05 17:23:35 +00:00
}
}
2015-11-04 21:06:06 +00:00
2016-03-02 23:42:00 +00:00
// {{ $ k . name }}ReaderIterator represents an iterator that streams from a reader.
type {{ $ k . name }}ReaderIterator struct {
2016-02-19 20:38:02 +00:00
r io.Reader
2016-03-02 23:42:00 +00:00
dec *{{ $ k . Name }}PointDecoder
2016-02-19 20:38:02 +00:00
}
2016-03-02 23:42:00 +00:00
// new{{ $ k . Name }}ReaderIterator returns a new instance of {{ $ k . name }}ReaderIterator.
2017-10-11 14:08:31 +00:00
func new{{ $ k . Name }}ReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *{{ $ k . name }}ReaderIterator {
dec := New{{ $ k . Name }}PointDecoder(ctx, r)
2016-03-17 15:55:37 +00:00
dec.stats = stats
2016-03-02 23:42:00 +00:00
return &{{ $ k . name }}ReaderIterator{
2016-02-19 20:38:02 +00:00
r: r,
2017-10-11 14:08:31 +00:00
dec: dec,
2016-02-19 20:38:02 +00:00
}
}
2016-03-17 15:55:37 +00:00
// Stats returns stats about points processed.
func (itr *{{ $ k . name }}ReaderIterator) Stats() IteratorStats { return itr.dec.stats }
2016-02-19 20:38:02 +00:00
// Close closes the underlying reader, if applicable.
2016-03-02 23:42:00 +00:00
func (itr *{{ $ k . name }}ReaderIterator) Close() error {
2016-02-19 20:38:02 +00:00
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
2016-04-17 20:00:59 +00:00
func (itr *{{ $ k . name }}ReaderIterator) Next() (*{{ $ k . Name }}Point, error) {
2016-02-19 20:38:02 +00:00
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
2016-03-02 23:42:00 +00:00
p := &{{ $ k . Name }}Point{}
if err := itr.dec.Decode{{ $ k . Name }}Point(p); err == io.EOF {
2016-04-17 20:00:59 +00:00
return nil, nil
2016-02-19 20:38:02 +00:00
} else if err != nil {
2016-04-17 20:00:59 +00:00
return nil, err
2016-02-19 20:38:02 +00:00
}
2016-04-17 20:00:59 +00:00
return p, nil
2016-02-19 20:38:02 +00:00
}
{{end}}
{{range .}}
// encode{{.Name}}Iterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error {
2016-03-17 15:55:37 +00:00
ticker := time.NewTicker(enc.StatsInterval)
defer ticker.Stop()
// Emit initial stats.
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
// Continually stream points from the iterator into the encoder.
2016-02-19 20:38:02 +00:00
penc := New{{.Name}}PointEncoder(enc.w)
for {
2016-03-17 15:55:37 +00:00
// Emit stats periodically.
select {
case <-ticker.C:
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
default:
}
2016-04-05 12:36:47 +00:00
2016-02-19 20:38:02 +00:00
// Retrieve the next point from the iterator.
2016-04-17 20:00:59 +00:00
p, err := itr.Next()
if err != nil {
return err
} else if p == nil {
2016-03-17 15:55:37 +00:00
break
2016-02-19 20:38:02 +00:00
}
// Write the point to the point encoder.
if err := penc.Encode{{.Name}}Point(p); err != nil {
return err
}
}
2016-03-17 15:55:37 +00:00
// Emit final stats.
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
return nil
}
{{end}}
{{end}}