influxdb/query/iterator.gen.go.tmpl

1581 lines
42 KiB
Cheetah

package query
import (
"context"
"container/heap"
"io"
"sort"
"sync"
"time"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxql"
)
// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval.
const DefaultStatsInterval = time.Second
{{with $types := .}}{{range $k := $types}}
// {{$k.Name}}Iterator represents a stream of {{$k.name}} points.
type {{$k.Name}}Iterator interface {
Iterator
Next() (*{{$k.Name}}Point, error)
}
// new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator.
// Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot
// be cast to a {{$k.Name}}Iterator.
func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator {
a := make([]{{$k.Name}}Iterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case {{$k.Name}}Iterator:
a = append(a, itr)
default:
itr.Close()
}
}
return a
}
// buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator.
type buf{{$k.Name}}Iterator struct {
itr {{$k.Name}}Iterator
buf *{{$k.Name}}Point
}
// newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator.
func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator {
return &buf{{$k.Name}}Iterator{itr: itr}
}
// Stats returns statistics from the input iterator.
func (itr *buf{{$k.Name}}Iterator) Stats() IteratorStats { return itr.itr.Stats() }
// Close closes the underlying iterator.
func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *buf{{$k.Name}}Iterator) peek() (*{{$k.Name}}Point, error) {
p, err := itr.Next()
if err != nil {
return nil, err
}
itr.unread(p)
return p, nil
}
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *buf{{$k.Name}}Iterator) peekTime() (int64, error) {
p, err := itr.peek()
if p == nil || err != nil {
return ZeroTime, err
}
return p.Time, nil
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *buf{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) {
buf := itr.buf
if buf != nil {
itr.buf = nil
return buf, nil
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) (*{{$k.Name}}Point, error) {
v, err := itr.Next()
if v == nil || err != nil {
return nil, err
} else if t := v.Time; t >= endTime || t < startTime {
itr.unread(v)
return nil, nil
}
return v, nil
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v }
// {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators.
type {{$k.name}}MergeIterator struct {
inputs []{{$k.Name}}Iterator
heap *{{$k.name}}MergeHeap
init bool
closed bool
mu sync.RWMutex
// Current iterator and window.
curr *{{$k.name}}MergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
}
// new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator.
func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator {
itr := &{{$k.name}}MergeIterator{
inputs: inputs,
heap: &{{$k.name}}MergeHeap{
items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBuf{{$k.Name}}Iterator(input)
// Append to the heap.
itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput})
}
return itr
}
// Stats returns an aggregation of stats from the underlying iterators.
func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats {
var stats IteratorStats
for _, input := range itr.inputs {
stats.Add(input.Stats())
}
return stats
}
// Close closes the underlying iterators.
func (itr *{{$k.name}}MergeIterator) Close() error {
itr.mu.Lock()
defer itr.mu.Unlock()
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
itr.closed = true
return nil
}
// Next returns the next point from the iterator.
func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) {
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations
// done during iterator creation cannot be interrupted, which is why we do it here
// instead so an interrupt can happen while initializing the heap.
if !itr.init {
items := itr.heap.items
itr.heap.items = make([]*{{$k.name}}MergeHeapItem, 0, len(items))
for _, item := range items {
if p, err := item.itr.peek(); err != nil {
return nil, err
} else if p == nil {
continue
}
itr.heap.items = append(itr.heap.items, item)
}
heap.Init(itr.heap)
itr.init = true
}
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil, nil
}
itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem)
// Read point and set current window.
p, err := itr.curr.itr.Next()
if err != nil {
return nil, err
}
tags := p.Tags.Subset(itr.heap.opt.Dimensions)
itr.window.name, itr.window.tags = p.Name, tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p, nil
}
// Read the next point from the current iterator.
p, err := itr.curr.itr.Next()
if err != nil {
return nil, err
}
// If there are no more points then remove iterator from heap and find next.
if p == nil {
itr.curr = nil
continue
}
// Check if the point is inside of our current window.
inWindow := true
if window := itr.window; window.name != p.Name {
inWindow = false
} else if tags := p.Tags.Subset(itr.heap.opt.Dimensions); window.tags != tags.ID() {
inWindow = false
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
inWindow = false
} else if !opt.Ascending && p.Time < window.startTime {
inWindow = false
}
// If it's outside our window then push iterator back on the heap and find new iterator.
if !inWindow {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
}
return p, nil
}
}
// {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type {{$k.name}}MergeHeap struct {
opt IteratorOptions
items []*{{$k.name}}MergeHeapItem
}
func (h *{{$k.name}}MergeHeap) Len() int { return len(h.items) }
func (h *{{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *{{$k.name}}MergeHeap) Less(i, j int) bool {
x, err := h.items[i].itr.peek()
if err != nil {
return true
}
y, err := h.items[j].itr.peek()
if err != nil {
return false
}
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
return xTags.ID() < yTags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
return xTags.ID() > yTags.ID()
}
}
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
return xt < yt
}
return xt > yt
}
func (h *{{$k.name}}MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem))
}
func (h *{{$k.name}}MergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
}
type {{$k.name}}MergeHeapItem struct {
itr *buf{{$k.Name}}Iterator
}
// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{$k.name}}SortedMergeIterator struct {
inputs []{{$k.Name}}Iterator
heap *{{$k.name}}SortedMergeHeap
init bool
}
// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator {
itr := &{{$k.name}}SortedMergeIterator{
inputs: inputs,
heap: &{{$k.name}}SortedMergeHeap{
items: make([]*{{$k.name}}SortedMergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
// Initialize heap items.
for _, input := range inputs {
// Append to the heap.
itr.heap.items = append(itr.heap.items, &{{$k.name}}SortedMergeHeapItem{itr: input})
}
return itr
}
// Stats returns an aggregation of stats from the underlying iterators.
func (itr *{{$k.name}}SortedMergeIterator) Stats() IteratorStats {
var stats IteratorStats
for _, input := range itr.inputs {
stats.Add(input.Stats())
}
return stats
}
// Close closes the underlying iterators.
func (itr *{{$k.name}}SortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next points from the iterator.
func (itr *{{$k.name}}SortedMergeIterator) Next() (*{{$k.Name}}Point, error) { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
// Initialize the heap. See the MergeIterator to see why this has to be done lazily.
if !itr.init {
items := itr.heap.items
itr.heap.items = make([]*{{$k.name}}SortedMergeHeapItem, 0, len(items))
for _, item := range items {
var err error
if item.point, err = item.itr.Next(); err != nil {
return nil, err
} else if item.point == nil {
continue
}
itr.heap.items = append(itr.heap.items, item)
}
heap.Init(itr.heap)
itr.init = true
}
if len(itr.heap.items) == 0 {
return nil, nil
}
// Read the next item from the heap.
item := heap.Pop(itr.heap).(*{{$k.name}}SortedMergeHeapItem)
if item.err != nil {
return nil, item.err
} else if item.point == nil {
return nil, nil
}
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point, item.err = item.itr.Next(); item.point != nil {
heap.Push(itr.heap, item)
}
return p, nil
}
// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems.
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
type {{$k.name}}SortedMergeHeap struct {
opt IteratorOptions
items []*{{$k.name}}SortedMergeHeapItem
}
func (h *{{$k.name}}SortedMergeHeap) Len() int { return len(h.items) }
func (h *{{$k.name}}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
x, y := h.items[i].point, h.items[j].point
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
}
if x.Time != y.Time{
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
}
if x.Name != y.Name {
return x.Name > y.Name
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
if x.Time != y.Time{
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
}
func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*{{$k.name}}SortedMergeHeapItem))
}
func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
}
type {{$k.name}}SortedMergeHeapItem struct {
point *{{$k.Name}}Point
err error
itr {{$k.Name}}Iterator
}
// {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map.
type {{$k.name}}IteratorScanner struct {
input *buf{{$k.Name}}Iterator
err error
keys []influxql.VarRef
defaultValue interface{}
}
// new{{$k.Name}}IteratorScanner creates a new IteratorScanner.
func new{{$k.Name}}IteratorScanner(input {{$k.Name}}Iterator, keys []influxql.VarRef, defaultValue interface{}) *{{$k.name}}IteratorScanner {
return &{{$k.name}}IteratorScanner{
input: newBuf{{$k.Name}}Iterator(input),
keys: keys,
defaultValue: defaultValue,
}
}
func (s *{{$k.name}}IteratorScanner) Peek() (int64, string, Tags) {
if s.err != nil {
return ZeroTime, "", Tags{}
}
p, err := s.input.peek()
if err != nil {
s.err = err
return ZeroTime, "", Tags{}
} else if p == nil {
return ZeroTime, "", Tags{}
}
return p.Time, p.Name, p.Tags
}
func (s *{{$k.name}}IteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) {
if s.err != nil {
return
}
p, err := s.input.Next()
if err != nil {
s.err = err
return
} else if p == nil {
s.useDefaults(m)
return
} else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) {
s.useDefaults(m)
s.input.unread(p)
return
}
if k := s.keys[0]; k.Val != "" {
if p.Nil {
if s.defaultValue != SkipDefault {
m[k.Val] = castToType(s.defaultValue, k.Type)
}
} else {
m[k.Val] = p.Value
}
}
for i, v := range p.Aux {
k := s.keys[i+1]
switch v.(type) {
case float64, int64, uint64, string, bool:
m[k.Val] = v
default:
// Insert the fill value if one was specified.
if s.defaultValue != SkipDefault {
m[k.Val] = castToType(s.defaultValue, k.Type)
}
}
}
}
func (s *{{$k.name}}IteratorScanner) useDefaults(m map[string]interface{}) {
if s.defaultValue == SkipDefault {
return
}
for _, k := range s.keys {
if k.Val == "" {
continue
}
m[k.Val] = castToType(s.defaultValue, k.Type)
}
}
func (s *{{$k.name}}IteratorScanner) Stats() IteratorStats { return s.input.Stats() }
func (s *{{$k.name}}IteratorScanner) Err() error { return s.err }
func (s *{{$k.name}}IteratorScanner) Close() error { return s.input.Close() }
// {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine.
type {{$k.name}}ParallelIterator struct {
input {{$k.Name}}Iterator
ch chan {{$k.name}}PointError
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator.
func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}ParallelIterator {
itr := &{{$k.name}}ParallelIterator{
input: input,
ch: make(chan {{$k.name}}PointError, 256),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
// Stats returns stats from the underlying iterator.
func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the underlying iterators.
func (itr *{{$k.name}}ParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
// Next returns the next point from the iterator.
func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) {
v, ok := <-itr.ch
if !ok {
return nil, io.EOF
}
return v.point, v.err
}
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *{{$k.name}}ParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.
p, err := itr.input.Next()
if p != nil {
p = p.Clone()
}
select {
case <-itr.closing:
return
case itr.ch <- {{$k.name}}PointError{point: p, err: err}:
}
}
}
type {{$k.name}}PointError struct {
point *{{$k.Name}}Point
err error
}
// {{$k.name}}LimitIterator represents an iterator that limits points per group.
type {{$k.name}}LimitIterator struct {
input {{$k.Name}}Iterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator.
func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator {
return &{{$k.name}}LimitIterator{
input: input,
opt: opt,
}
}
// Stats returns stats from the underlying iterator.
func (itr *{{$k.name}}LimitIterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the underlying iterators.
func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *{{$k.name}}LimitIterator) Next() (*{{$k.Name}}Point, error) {
for {
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
continue
}
return p, nil
}
}
type {{$k.name}}FillIterator struct {
input *buf{{$k.Name}}Iterator
prev {{$k.Name}}Point
startTime int64
endTime int64
auxFields []interface{}
init bool
opt IteratorOptions
window struct {
name string
tags Tags
time int64
offset int64
}
}
func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr influxql.Expr, opt IteratorOptions) *{{$k.name}}FillIterator {
if opt.Fill == influxql.NullFill {
if expr, ok := expr.(*influxql.Call); ok && expr.Name == "count" {
opt.Fill = influxql.NumberFill
opt.FillValue = {{$k.Zero}}
}
}
var startTime, endTime int64
if opt.Ascending {
startTime, _ = opt.Window(opt.StartTime)
endTime, _ = opt.Window(opt.EndTime)
} else {
startTime, _ = opt.Window(opt.EndTime)
endTime, _ = opt.Window(opt.StartTime)
}
var auxFields []interface{}
if len(opt.Aux) > 0 {
auxFields = make([]interface{}, len(opt.Aux))
}
return &{{$k.name}}FillIterator{
input: newBuf{{$k.Name}}Iterator(input),
prev: {{$k.Name}}Point{Nil: true},
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
}
}
func (itr *{{$k.name}}FillIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
if !itr.init {
p, err := itr.input.peek()
if p == nil || err != nil {
return nil, err
}
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.startTime == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
itr.init = true
}
p, err := itr.input.Next()
if err != nil {
return nil, err
}
// Check if the next point is outside of our window or is nil.
if p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
// If we are inside of an interval, unread the point and continue below to
// constructing a new point.
if itr.opt.Ascending && itr.window.time <= itr.endTime {
itr.input.unread(p)
p = nil
goto CONSTRUCT
} else if !itr.opt.Ascending && itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
itr.input.unread(p)
p = nil
goto CONSTRUCT
}
// We are *not* in a current interval. If there is no next point,
// we are at the end of all intervals.
if p == nil {
return nil, nil
}
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
if itr.window.time == influxql.MinTime {
itr.window.time, _ = itr.opt.Window(p.Time)
}
if itr.opt.Location != nil {
_, itr.window.offset = itr.opt.Zone(itr.window.time)
}
itr.prev = {{$k.Name}}Point{Nil: true}
}
// Check if the point is our next expected point.
CONSTRUCT:
if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) {
if p != nil {
itr.input.unread(p)
}
p = &{{$k.Name}}Point{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
Aux: itr.auxFields,
}
switch itr.opt.Fill {
case influxql.LinearFill:
{{- if or (eq $k.Name "Float") (eq $k.Name "Integer") (eq $k.Name "Unsigned")}}
if !itr.prev.Nil {
next, err := itr.input.peek()
if err != nil {
return nil, err
} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
} else {
p.Nil = true
}
} else {
p.Nil = true
}
{{else}}
fallthrough
{{- end}}
case influxql.NullFill:
p.Nil = true
case influxql.NumberFill:
p.Value, _ = castTo{{$k.Name}}(itr.opt.FillValue)
case influxql.PreviousFill:
if !itr.prev.Nil {
p.Value = itr.prev.Value
p.Nil = itr.prev.Nil
} else {
p.Nil = true
}
}
} else {
itr.prev = *p
}
// Advance the expected time. Do not advance to a new window here
// as there may be lingering points with the same timestamp in the previous
// window.
if itr.opt.Ascending {
itr.window.time += int64(itr.opt.Interval.Duration)
} else {
itr.window.time -= int64(itr.opt.Interval.Duration)
}
// Check to see if we have passed over an offset change and adjust the time
// to account for this new offset.
if itr.opt.Location != nil {
if _, offset := itr.opt.Zone(itr.window.time - 1); offset != itr.window.offset {
diff := itr.window.offset - offset
if abs(diff) < int64(itr.opt.Interval.Duration) {
itr.window.time += diff
}
itr.window.offset = offset
}
}
return p, nil
}
// {{$k.name}}IntervalIterator represents a {{$k.name}} implementation of IntervalIterator.
type {{$k.name}}IntervalIterator struct {
input {{$k.Name}}Iterator
opt IteratorOptions
}
func new{{$k.Name}}IntervalIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}IntervalIterator {
return &{{$k.name}}IntervalIterator{input: input, opt: opt}
}
func (itr *{{$k.name}}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}IntervalIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) {
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == influxql.MinTime {
p.Time = 0
}
return p, nil
}
// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator.
type {{$k.name}}InterruptIterator struct {
input {{$k.Name}}Iterator
closing <-chan struct{}
count int
}
func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator {
return &{{$k.name}}InterruptIterator{input: input, closing: closing}
}
func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) {
// Only check if the channel is closed every N points. This
// intentionally checks on both 0 and N so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count & 0xFF == 0xFF {
select {
case <-itr.closing:
return nil, itr.Close()
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}
// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}
// {{$k.name}}CloseInterruptIterator represents a {{$k.name}} implementation of CloseInterruptIterator.
type {{$k.name}}CloseInterruptIterator struct {
input {{$k.Name}}Iterator
closing <-chan struct{}
done chan struct{}
once sync.Once
}
func new{{$k.Name}}CloseInterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}CloseInterruptIterator {
itr := &{{$k.name}}CloseInterruptIterator{
input: input,
closing: closing,
done: make(chan struct{}),
}
go itr.monitor()
return itr
}
func (itr *{{$k.name}}CloseInterruptIterator) monitor() {
select {
case <-itr.closing:
itr.Close()
case <-itr.done:
}
}
func (itr *{{$k.name}}CloseInterruptIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *{{$k.name}}CloseInterruptIterator) Close() error {
itr.once.Do(func() {
close(itr.done)
itr.input.Close()
})
return nil
}
func (itr *{{$k.name}}CloseInterruptIterator) Next() (*{{$k.Name}}Point, error) {
p, err := itr.input.Next()
if err != nil {
// Check if the iterator was closed.
select {
case <-itr.done:
return nil, nil
default:
return nil, err
}
}
return p, nil
}
{{range $v := $types}}
// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result.
type {{$k.name}}Reduce{{$v.Name}}Iterator struct {
input *buf{{$k.Name}}Iterator
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
dims []string
opt IteratorOptions
points []{{$v.Name}}Point
keepTags bool
}
func new{{$k.Name}}Reduce{{$v.Name}}Iterator(input {{$k.Name}}Iterator, opt IteratorOptions, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)) *{{$k.name}}Reduce{{$v.Name}}Iterator {
return &{{$k.name}}Reduce{{$v.Name}}Iterator{
input: newBuf{{$k.Name}}Iterator(input),
create: createFn,
dims: opt.GetDimensions(),
opt: opt,
}
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
var err error
itr.points, err = itr.reduce()
if len(itr.points) == 0 {
return nil, err
}
}
// Pop next point off the stack.
p := &itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p, nil
}
// {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination.
type {{$k.name}}Reduce{{$v.Name}}Point struct {
Name string
Tags Tags
Aggregator {{$k.Name}}PointAggregator
Emitter {{$v.Name}}PointEmitter
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) {
// Calculate next window.
var (
startTime, endTime int64
window struct {
name string
tags string
}
)
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
} else if p.Nil {
continue
}
// Unread the point so it can be processed.
itr.input.unread(p)
startTime, endTime = itr.opt.Window(p.Time)
window.name, window.tags = p.Name, p.Tags.Subset(itr.opt.Dimensions).ID()
break
}
// Create points by tags.
m := make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point)
for {
// Read next point.
curr, err := itr.input.NextInWindow(startTime, endTime)
if err != nil {
return nil, err
} else if curr == nil {
break
} else if curr.Nil {
continue
} else if curr.Name != window.name {
itr.input.unread(curr)
break
}
// Ensure this point is within the same final window.
if curr.Name != window.name {
itr.input.unread(curr)
break
} else if tags := curr.Tags.Subset(itr.opt.Dimensions); tags.ID() != window.tags {
itr.input.unread(curr)
break
}
// Retrieve the tags on this point for this level of the query.
// This may be different than the bucket dimensions.
tags := curr.Tags.Subset(itr.dims)
id := tags.ID()
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &{{$k.name}}Reduce{{$v.Name}}Point{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
m[id] = rp
}
rp.Aggregator.Aggregate{{$k.Name}}(curr)
}
// Reverse sort points by name & tag if our output is supposed to be ordered.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
if len(keys) > 1 && itr.opt.Ordered {
sort.Sort(reverseStringSlice(keys))
}
// Assume the points are already sorted until proven otherwise.
sortedByTime := true
// Emit the points for each name & tag combination.
a := make([]{{$v.Name}}Point, 0, len(m))
for _, k := range keys {
rp := m[k]
points := rp.Emitter.Emit()
for i := len(points)-1; i >= 0; i-- {
points[i].Name = rp.Name
if !itr.keepTags {
points[i].Tags = rp.Tags
}
// Set the points time to the interval time if the reducer didn't provide one.
if points[i].Time == ZeroTime {
points[i].Time = startTime
} else {
sortedByTime = false
}
a = append(a, points[i])
}
}
// Points may be out of order. Perform a stable sort by time if requested.
if !sortedByTime && itr.opt.Ordered {
sort.Stable(sort.Reverse({{$v.name}}PointsByTime(a)))
}
return a, nil
}
// {{$k.name}}Stream{{$v.Name}}Iterator streams inputs into the iterator and emits points gradually.
type {{$k.name}}Stream{{$v.Name}}Iterator struct {
input *buf{{$k.Name}}Iterator
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
dims []string
opt IteratorOptions
m map[string]*{{$k.name}}Reduce{{$v.Name}}Point
points []{{$v.Name}}Point
}
// new{{$k.Name}}Stream{{$v.Name}}Iterator returns a new instance of {{$k.name}}Stream{{$v.Name}}Iterator.
func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator {
return &{{$k.name}}Stream{{$v.Name}}Iterator{
input: newBuf{{$k.Name}}Iterator(input),
create: createFn,
dims: opt.GetDimensions(),
opt: opt,
m: make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point),
}
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
// Next returns the next value for the stream iterator.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
var err error
itr.points, err = itr.reduce()
if len(itr.points) == 0 {
return nil, err
}
}
// Pop next point off the stack.
p := &itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p, nil
}
// reduce creates and manages aggregators for every point from the input.
// After aggregating a point, it always tries to emit a value using the emitter.
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) {
// We have already read all of the input points.
if itr.m == nil {
return nil, nil
}
for {
// Read next point.
curr, err := itr.input.Next()
if err != nil {
return nil, err
} else if curr == nil {
// Close all of the aggregators to flush any remaining points to emit.
var points []{{$v.Name}}Point
for _, rp := range itr.m {
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
if err := aggregator.Close(); err != nil {
return nil, err
}
pts := rp.Emitter.Emit()
if len(pts) == 0 {
continue
}
for i := range pts {
pts[i].Name = rp.Name
pts[i].Tags = rp.Tags
}
points = append(points, pts...)
}
}
// Eliminate the aggregators and emitters.
itr.m = nil
return points, nil
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.dims)
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := itr.m[id]
if rp == nil {
aggregator, emitter := itr.create()
rp = &{{$k.name}}Reduce{{.Name}}Point{
Name: curr.Name,
Tags: tags,
Aggregator: aggregator,
Emitter: emitter,
}
itr.m[id] = rp
}
rp.Aggregator.Aggregate{{$k.Name}}(curr)
// Attempt to emit points from the aggregator.
points := rp.Emitter.Emit()
if len(points) == 0 {
continue
}
for i := range points {
points[i].Name = rp.Name
points[i].Tags = rp.Tags
}
return points, nil
}
}
{{end}}
// {{$k.name}}DedupeIterator only outputs unique points.
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type {{$k.name}}DedupeIterator struct {
input {{$k.Name}}Iterator
m map[string]struct{} // lookup of points already sent
}
type {{$k.name}}IteratorMapper struct {
cur Cursor
row Row
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point {{$k.Name}}Point
}
func new{{$k.Name}}IteratorMapper(cur Cursor, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper {
return &{{$k.name}}IteratorMapper{
cur: cur,
driver: driver,
fields: fields,
point: {{$k.Name}}Point{
Aux: make([]interface{}, len(fields)),
},
}
}
func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) {
if !itr.cur.Scan(&itr.row) {
if err := itr.cur.Err(); err != nil {
return nil, err
}
return nil, nil
}
itr.point.Time = itr.row.Time
itr.point.Name = itr.row.Series.Name
itr.point.Tags = itr.row.Series.Tags
if itr.driver != nil {
if v := itr.driver.Value(&itr.row); v != nil {
if v, ok := castTo{{$k.Name}}(v); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = {{$k.Nil}}
itr.point.Nil = true
}
} else {
itr.point.Value = {{$k.Nil}}
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(&itr.row)
}
return &itr.point, nil
}
func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats {
return itr.cur.Stats()
}
func (itr *{{$k.name}}IteratorMapper) Close() error {
return itr.cur.Close()
}
type {{$k.name}}FilterIterator struct {
input {{$k.Name}}Iterator
cond influxql.Expr
opt IteratorOptions
m map[string]interface{}
}
func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond influxql.Expr, opt IteratorOptions) {{$k.Name}}Iterator {
// Strip out time conditions from the WHERE clause.
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
n := influxql.RewriteFunc(influxql.CloneExpr(cond), func(n influxql.Node) influxql.Node {
switch n := n.(type) {
case *influxql.BinaryExpr:
if n.LHS.String() == "time" {
return &influxql.BooleanLiteral{Val: true}
}
}
return n
})
cond, _ = n.(influxql.Expr)
if cond == nil {
return input
} else if n, ok := cond.(*influxql.BooleanLiteral); ok && n.Val {
return input
}
return &{{$k.name}}FilterIterator{
input: input,
cond: cond,
opt: opt,
m: make(map[string]interface{}),
}
}
func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) {
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
}
for i, ref := range itr.opt.Aux {
itr.m[ref.Val] = p.Aux[i]
}
for k, v := range p.Tags.KeyValues() {
itr.m[k] = v
}
if !influxql.EvalBool(itr.cond, itr.m) {
continue
}
return p, nil
}
}
type {{$k.name}}TagSubsetIterator struct {
input {{$k.Name}}Iterator
point {{$k.Name}}Point
lastTags Tags
dimensions []string
}
func new{{$k.Name}}TagSubsetIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}TagSubsetIterator {
return &{{$k.name}}TagSubsetIterator{
input: input,
dimensions: opt.GetDimensions(),
}
}
func (itr *{{$k.name}}TagSubsetIterator) Next() (*{{$k.Name}}Point, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p == nil {
return nil, nil
}
itr.point.Name = p.Name
if !p.Tags.Equal(itr.lastTags) {
itr.point.Tags = p.Tags.Subset(itr.dimensions)
itr.lastTags = p.Tags
}
itr.point.Time = p.Time
itr.point.Value = p.Value
itr.point.Aux = p.Aux
itr.point.Aggregated = p.Aggregated
itr.point.Nil = p.Nil
return &itr.point, nil
}
func (itr *{{$k.name}}TagSubsetIterator) Stats() IteratorStats {
return itr.input.Stats()
}
func (itr *{{$k.name}}TagSubsetIterator) Close() error {
return itr.input.Close()
}
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
return &{{$k.name}}DedupeIterator{
input: input,
m: make(map[string]struct{}),
}
}
// Stats returns stats from the input iterator.
func (itr *{{$k.name}}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the iterator and all child iterators.
func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *{{$k.name}}DedupeIterator) Next() (*{{$k.Name}}Point, error) {
for {
// Read next point.
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encode{{$k.Name}}Point(p))
if err != nil {
return nil, err
}
// If the point has already been output then move to the next point.
if _, ok := itr.m[string(buf)]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[string(buf)] = struct{}{}
return p, nil
}
}
// {{$k.name}}ReaderIterator represents an iterator that streams from a reader.
type {{$k.name}}ReaderIterator struct {
r io.Reader
dec *{{$k.Name}}PointDecoder
}
// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator.
func new{{$k.Name}}ReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator {
dec := New{{$k.Name}}PointDecoder(ctx, r)
dec.stats = stats
return &{{$k.name}}ReaderIterator{
r: r,
dec: dec,
}
}
// Stats returns stats about points processed.
func (itr *{{$k.name}}ReaderIterator) Stats() IteratorStats { return itr.dec.stats }
// Close closes the underlying reader, if applicable.
func (itr *{{$k.name}}ReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *{{$k.name}}ReaderIterator) Next() (*{{$k.Name}}Point, error) {
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &{{$k.Name}}Point{}
if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF {
return nil, nil
} else if err != nil {
return nil, err
}
return p, nil
}
{{end}}
{{range .}}
// encode{{.Name}}Iterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error {
ticker := time.NewTicker(enc.StatsInterval)
defer ticker.Stop()
// Emit initial stats.
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
// Continually stream points from the iterator into the encoder.
penc := New{{.Name}}PointEncoder(enc.w)
for {
// Emit stats periodically.
select {
case <-ticker.C:
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
default:
}
// Retrieve the next point from the iterator.
p, err := itr.Next()
if err != nil {
return err
} else if p == nil {
break
}
// Write the point to the point encoder.
if err := penc.Encode{{.Name}}Point(p); err != nil {
return err
}
}
// Emit final stats.
if err := enc.encodeStats(itr.Stats()); err != nil {
return err
}
return nil
}
{{end}}
{{end}}