influxdb/influxql/iterator.gen.go

4025 lines
100 KiB
Go

// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: iterator.gen.go.tmpl
package influxql
import (
"container/heap"
"errors"
"fmt"
"io"
"log"
"sort"
"sync"
"github.com/gogo/protobuf/proto"
)
// FloatIterator represents a stream of float points.
type FloatIterator interface {
Iterator
Next() *FloatPoint
}
// newFloatIterators converts a slice of Iterator to a slice of FloatIterator.
// Drop and closes any iterator in itrs that is not a FloatIterator and cannot
// be cast to a FloatIterator.
func newFloatIterators(itrs []Iterator) []FloatIterator {
a := make([]FloatIterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case FloatIterator:
a = append(a, itr)
case IntegerIterator:
a = append(a, &integerFloatCastIterator{input: itr})
default:
itr.Close()
}
}
return a
}
// bufFloatIterator represents a buffered FloatIterator.
type bufFloatIterator struct {
itr FloatIterator
buf *FloatPoint
}
// newBufFloatIterator returns a buffered FloatIterator.
func newBufFloatIterator(itr FloatIterator) *bufFloatIterator {
return &bufFloatIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufFloatIterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *bufFloatIterator) peek() *FloatPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *bufFloatIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
return ZeroTime
}
return p.Time
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufFloatIterator) Next() *FloatPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufFloatIterator) NextInWindow(startTime, endTime int64) *FloatPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufFloatIterator) unread(v *FloatPoint) { itr.buf = v }
// floatMergeIterator represents an iterator that combines multiple float iterators.
type floatMergeIterator struct {
inputs []FloatIterator
heap *floatMergeHeap
// Current iterator and window.
curr *floatMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
}
// newFloatMergeIterator returns a new instance of floatMergeIterator.
func newFloatMergeIterator(inputs []FloatIterator, opt IteratorOptions) *floatMergeIterator {
itr := &floatMergeIterator{
inputs: inputs,
heap: &floatMergeHeap{
items: make([]*floatMergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufFloatIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &floatMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *floatMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *floatMergeIterator) Next() *FloatPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*floatMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
if p == nil {
itr.curr = nil
continue
}
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
inWindow = false
}
// If it's outside our window then push iterator back on the heap and find new iterator.
if !inWindow {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
}
return p
}
}
// floatMergeHeap represents a heap of floatMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type floatMergeHeap struct {
opt IteratorOptions
items []*floatMergeHeapItem
}
func (h floatMergeHeap) Len() int { return len(h.items) }
func (h floatMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h floatMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
return xt < yt
}
return xt > yt
}
func (h *floatMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*floatMergeHeapItem))
}
func (h *floatMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
}
type floatMergeHeapItem struct {
itr *bufFloatIterator
}
// floatSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type floatSortedMergeIterator struct {
inputs []FloatIterator
opt IteratorOptions
heap floatSortedMergeHeap
}
// newFloatSortedMergeIterator returns an instance of floatSortedMergeIterator.
func newFloatSortedMergeIterator(inputs []FloatIterator, opt IteratorOptions) Iterator {
itr := &floatSortedMergeIterator{
inputs: inputs,
heap: make(floatSortedMergeHeap, 0, len(inputs)),
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
itr.heap = append(itr.heap, &floatSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *floatSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next points from the iterator.
func (itr *floatSortedMergeIterator) Next() *FloatPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *floatSortedMergeIterator) pop() *FloatPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*floatSortedMergeHeapItem)
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
// floatSortedMergeHeap represents a heap of floatSortedMergeHeapItems.
type floatSortedMergeHeap []*floatSortedMergeHeapItem
func (h floatSortedMergeHeap) Len() int { return len(h) }
func (h floatSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h floatSortedMergeHeap) Less(i, j int) bool {
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
func (h *floatSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*floatSortedMergeHeapItem))
}
func (h *floatSortedMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
type floatSortedMergeHeapItem struct {
point *FloatPoint
itr FloatIterator
ascending bool
}
// floatLimitIterator represents an iterator that limits points per group.
type floatLimitIterator struct {
input FloatIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newFloatLimitIterator returns a new instance of floatLimitIterator.
func newFloatLimitIterator(input FloatIterator, opt IteratorOptions) *floatLimitIterator {
return &floatLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *floatLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *floatLimitIterator) Next() *FloatPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
// If there's no interval and no groups then simply exit.
if itr.opt.Interval.IsZero() && len(itr.opt.Dimensions) == 0 {
return nil
}
continue
}
return p
}
}
type floatFillIterator struct {
input *bufFloatIterator
prev *FloatPoint
startTime int64
endTime int64
auxFields []interface{}
done bool
opt IteratorOptions
window struct {
name string
tags Tags
time int64
}
}
func newFloatFillIterator(input FloatIterator, expr Expr, opt IteratorOptions) *floatFillIterator {
if opt.Fill == NullFill {
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
opt.Fill = NumberFill
opt.FillValue = float64(0)
}
}
var startTime, endTime int64
if opt.Ascending {
startTime, _ = opt.Window(opt.StartTime)
_, endTime = opt.Window(opt.EndTime)
} else {
_, startTime = opt.Window(opt.EndTime)
endTime, _ = opt.Window(opt.StartTime)
}
var auxFields []interface{}
if len(opt.Aux) > 0 {
auxFields = make([]interface{}, len(opt.Aux))
}
itr := &floatFillIterator{
input: newBufFloatIterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
}
p := itr.input.peek()
if p != nil {
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
} else {
itr.window.time = itr.endTime
}
return itr
}
func (itr *floatFillIterator) Close() error { return itr.input.Close() }
func (itr *floatFillIterator) Next() *FloatPoint {
p := itr.input.Next()
// Check if the next point is outside of our window or is nil.
for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
// If we are inside of an interval, unread the point and continue below to
// constructing a new point.
if itr.opt.Ascending {
if itr.window.time < itr.endTime {
itr.input.unread(p)
p = nil
break
}
} else {
if itr.window.time >= itr.endTime {
itr.input.unread(p)
p = nil
break
}
}
// We are *not* in a current interval. If there is no next point,
// we are at the end of all intervals.
if p == nil {
return nil
}
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
itr.prev = nil
break
}
// Check if the point is our next expected point.
if p == nil || p.Time > itr.window.time {
if p != nil {
itr.input.unread(p)
}
p = &FloatPoint{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
Aux: itr.auxFields,
}
switch itr.opt.Fill {
case NullFill:
p.Nil = true
case NumberFill:
p.Value = castToFloat(itr.opt.FillValue)
case PreviousFill:
if itr.prev != nil {
p.Value = itr.prev.Value
p.Nil = itr.prev.Nil
} else {
p.Nil = true
}
}
} else {
itr.prev = p
}
// Advance the expected time. Do not advance to a new window here
// as there may be lingering points with the same timestamp in the previous
// window.
if itr.opt.Ascending {
itr.window.time = p.Time + int64(itr.opt.Interval.Duration)
} else {
itr.window.time = p.Time - int64(itr.opt.Interval.Duration)
}
return p
}
// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
output chan *FloatPoint
fields auxIteratorFields
}
func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator {
return &floatAuxIterator{
input: newBufFloatIterator(input),
output: make(chan *FloatPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
}
func (itr *floatAuxIterator) Start() { go itr.stream() }
func (itr *floatAuxIterator) Close() error { return itr.input.Close() }
func (itr *floatAuxIterator) Next() *FloatPoint { return <-itr.output }
func (itr *floatAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *floatAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *floatAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
return nil, nil, errors.New("not implemented")
}
func (itr *floatAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return nil, errors.New("not implemented")
}
func (itr *floatAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// floatChanIterator represents a new instance of floatChanIterator.
type floatChanIterator struct {
c chan *FloatPoint
once sync.Once
}
func (itr *floatChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *floatChanIterator) Next() *FloatPoint { return <-itr.c }
// floatReduceIterator executes a reducer for every interval and buffers the result.
type floatReduceIterator struct {
input *bufFloatIterator
fn floatReduceFunc
opt IteratorOptions
points []*FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *floatReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatReduceIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *floatReduceIterator) reduce() []*FloatPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*FloatPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &FloatPoint{Name: curr.Name, Tags: tags}
m[id] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
prev.Aggregated++
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*FloatPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}
// floatReduceFunc is the function called by a FloatPoint reducer.
type floatReduceFunc func(prev, curr *FloatPoint, opt *reduceOptions) (t int64, v float64, aux []interface{})
// floatReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type floatReduceSliceIterator struct {
input *bufFloatIterator
fn floatReduceSliceFunc
opt IteratorOptions
points []FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *floatReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatReduceSliceIterator) Next() *FloatPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *floatReduceSliceIterator) reduce() []FloatPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []FloatPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
} else if p.Nil {
continue
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := p.Name + "\x00" + tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]FloatPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]FloatPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// floatReduceSliceFunc is the function called by a FloatPoint slice reducer.
type floatReduceSliceFunc func(a []FloatPoint, opt *reduceOptions) []FloatPoint
// floatReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type floatTransformIterator struct {
input FloatIterator
fn floatTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *floatTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatTransformIterator) Next() *FloatPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// floatTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type floatTransformFunc func(p *FloatPoint) *FloatPoint
// floatReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type floatBoolTransformIterator struct {
input FloatIterator
fn floatBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *floatBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *floatBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// floatBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type floatBoolTransformFunc func(p *FloatPoint) *BooleanPoint
// floatDedupeIterator only outputs unique points.
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type floatDedupeIterator struct {
input FloatIterator
m map[string]struct{} // lookup of points already sent
}
// newFloatDedupeIterator returns a new instance of floatDedupeIterator.
func newFloatDedupeIterator(input FloatIterator) *floatDedupeIterator {
return &floatDedupeIterator{
input: input,
m: make(map[string]struct{}),
}
}
// Close closes the iterator and all child iterators.
func (itr *floatDedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *floatDedupeIterator) Next() *FloatPoint {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encodeFloatPoint(p))
if err != nil {
log.Println("error marshaling dedupe point:", err)
continue
}
// If the point has already been output then move to the next point.
if _, ok := itr.m[string(buf)]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[string(buf)] = struct{}{}
return p
}
}
// floatReaderIterator represents an iterator that streams from a reader.
type floatReaderIterator struct {
r io.Reader
dec *FloatPointDecoder
first *FloatPoint
}
// newFloatReaderIterator returns a new instance of floatReaderIterator.
func newFloatReaderIterator(r io.Reader, first *FloatPoint) *floatReaderIterator {
return &floatReaderIterator{
r: r,
dec: NewFloatPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *floatReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *floatReaderIterator) Next() *FloatPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &FloatPoint{}
if err := itr.dec.DecodeFloatPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// floatMeanIterator returns the mean of values for every interval.
type floatMeanIterator struct {
input *bufFloatIterator
opt IteratorOptions
points []*FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *floatMeanIterator) Close() error { return itr.input.Close() }
// Next returns the next value.
func (itr *floatMeanIterator) Next() *FloatPoint {
// Calculate the next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.next()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
func (itr *floatMeanIterator) next() []*FloatPoint {
startTime, endTime := itr.opt.Window(itr.input.peekTime())
// Create points by tags.
m := make(map[string]struct {
Name string
Tags Tags
Sum float64
Count uint32
})
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
prev, ok := m[id]
if !ok {
prev.Name = curr.Name
prev.Tags = tags
}
if curr.Aggregated != 0 {
prev.Sum += curr.Value * float64(curr.Aggregated)
prev.Count += curr.Aggregated
} else {
prev.Sum += curr.Value
prev.Count++
}
m[id] = prev
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*FloatPoint, len(m))
for i, k := range keys {
val := m[k]
a[i] = &FloatPoint{
Name: val.Name,
Tags: val.Tags,
Value: float64(val.Sum) / float64(val.Count),
Aggregated: val.Count,
Time: startTime,
}
}
return a
}
// IntegerIterator represents a stream of integer points.
type IntegerIterator interface {
Iterator
Next() *IntegerPoint
}
// newIntegerIterators converts a slice of Iterator to a slice of IntegerIterator.
// Drop and closes any iterator in itrs that is not a IntegerIterator and cannot
// be cast to a IntegerIterator.
func newIntegerIterators(itrs []Iterator) []IntegerIterator {
a := make([]IntegerIterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case IntegerIterator:
a = append(a, itr)
default:
itr.Close()
}
}
return a
}
// bufIntegerIterator represents a buffered IntegerIterator.
type bufIntegerIterator struct {
itr IntegerIterator
buf *IntegerPoint
}
// newBufIntegerIterator returns a buffered IntegerIterator.
func newBufIntegerIterator(itr IntegerIterator) *bufIntegerIterator {
return &bufIntegerIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufIntegerIterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *bufIntegerIterator) peek() *IntegerPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *bufIntegerIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
return ZeroTime
}
return p.Time
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufIntegerIterator) Next() *IntegerPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufIntegerIterator) NextInWindow(startTime, endTime int64) *IntegerPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufIntegerIterator) unread(v *IntegerPoint) { itr.buf = v }
// integerMergeIterator represents an iterator that combines multiple integer iterators.
type integerMergeIterator struct {
inputs []IntegerIterator
heap *integerMergeHeap
// Current iterator and window.
curr *integerMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
}
// newIntegerMergeIterator returns a new instance of integerMergeIterator.
func newIntegerMergeIterator(inputs []IntegerIterator, opt IteratorOptions) *integerMergeIterator {
itr := &integerMergeIterator{
inputs: inputs,
heap: &integerMergeHeap{
items: make([]*integerMergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufIntegerIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &integerMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *integerMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *integerMergeIterator) Next() *IntegerPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*integerMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
if p == nil {
itr.curr = nil
continue
}
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
inWindow = false
}
// If it's outside our window then push iterator back on the heap and find new iterator.
if !inWindow {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
}
return p
}
}
// integerMergeHeap represents a heap of integerMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type integerMergeHeap struct {
opt IteratorOptions
items []*integerMergeHeapItem
}
func (h integerMergeHeap) Len() int { return len(h.items) }
func (h integerMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h integerMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
return xt < yt
}
return xt > yt
}
func (h *integerMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*integerMergeHeapItem))
}
func (h *integerMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
}
type integerMergeHeapItem struct {
itr *bufIntegerIterator
}
// integerSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type integerSortedMergeIterator struct {
inputs []IntegerIterator
opt IteratorOptions
heap integerSortedMergeHeap
}
// newIntegerSortedMergeIterator returns an instance of integerSortedMergeIterator.
func newIntegerSortedMergeIterator(inputs []IntegerIterator, opt IteratorOptions) Iterator {
itr := &integerSortedMergeIterator{
inputs: inputs,
heap: make(integerSortedMergeHeap, 0, len(inputs)),
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
itr.heap = append(itr.heap, &integerSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *integerSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next points from the iterator.
func (itr *integerSortedMergeIterator) Next() *IntegerPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *integerSortedMergeIterator) pop() *IntegerPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*integerSortedMergeHeapItem)
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
// integerSortedMergeHeap represents a heap of integerSortedMergeHeapItems.
type integerSortedMergeHeap []*integerSortedMergeHeapItem
func (h integerSortedMergeHeap) Len() int { return len(h) }
func (h integerSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h integerSortedMergeHeap) Less(i, j int) bool {
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
func (h *integerSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*integerSortedMergeHeapItem))
}
func (h *integerSortedMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
type integerSortedMergeHeapItem struct {
point *IntegerPoint
itr IntegerIterator
ascending bool
}
// integerLimitIterator represents an iterator that limits points per group.
type integerLimitIterator struct {
input IntegerIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newIntegerLimitIterator returns a new instance of integerLimitIterator.
func newIntegerLimitIterator(input IntegerIterator, opt IteratorOptions) *integerLimitIterator {
return &integerLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *integerLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *integerLimitIterator) Next() *IntegerPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
// If there's no interval and no groups then simply exit.
if itr.opt.Interval.IsZero() && len(itr.opt.Dimensions) == 0 {
return nil
}
continue
}
return p
}
}
type integerFillIterator struct {
input *bufIntegerIterator
prev *IntegerPoint
startTime int64
endTime int64
auxFields []interface{}
done bool
opt IteratorOptions
window struct {
name string
tags Tags
time int64
}
}
func newIntegerFillIterator(input IntegerIterator, expr Expr, opt IteratorOptions) *integerFillIterator {
if opt.Fill == NullFill {
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
opt.Fill = NumberFill
opt.FillValue = int64(0)
}
}
var startTime, endTime int64
if opt.Ascending {
startTime, _ = opt.Window(opt.StartTime)
_, endTime = opt.Window(opt.EndTime)
} else {
_, startTime = opt.Window(opt.EndTime)
endTime, _ = opt.Window(opt.StartTime)
}
var auxFields []interface{}
if len(opt.Aux) > 0 {
auxFields = make([]interface{}, len(opt.Aux))
}
itr := &integerFillIterator{
input: newBufIntegerIterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
}
p := itr.input.peek()
if p != nil {
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
} else {
itr.window.time = itr.endTime
}
return itr
}
func (itr *integerFillIterator) Close() error { return itr.input.Close() }
func (itr *integerFillIterator) Next() *IntegerPoint {
p := itr.input.Next()
// Check if the next point is outside of our window or is nil.
for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
// If we are inside of an interval, unread the point and continue below to
// constructing a new point.
if itr.opt.Ascending {
if itr.window.time < itr.endTime {
itr.input.unread(p)
p = nil
break
}
} else {
if itr.window.time >= itr.endTime {
itr.input.unread(p)
p = nil
break
}
}
// We are *not* in a current interval. If there is no next point,
// we are at the end of all intervals.
if p == nil {
return nil
}
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
itr.prev = nil
break
}
// Check if the point is our next expected point.
if p == nil || p.Time > itr.window.time {
if p != nil {
itr.input.unread(p)
}
p = &IntegerPoint{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
Aux: itr.auxFields,
}
switch itr.opt.Fill {
case NullFill:
p.Nil = true
case NumberFill:
p.Value = castToInteger(itr.opt.FillValue)
case PreviousFill:
if itr.prev != nil {
p.Value = itr.prev.Value
p.Nil = itr.prev.Nil
} else {
p.Nil = true
}
}
} else {
itr.prev = p
}
// Advance the expected time. Do not advance to a new window here
// as there may be lingering points with the same timestamp in the previous
// window.
if itr.opt.Ascending {
itr.window.time = p.Time + int64(itr.opt.Interval.Duration)
} else {
itr.window.time = p.Time - int64(itr.opt.Interval.Duration)
}
return p
}
// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
output chan *IntegerPoint
fields auxIteratorFields
}
func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator {
return &integerAuxIterator{
input: newBufIntegerIterator(input),
output: make(chan *IntegerPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
}
func (itr *integerAuxIterator) Start() { go itr.stream() }
func (itr *integerAuxIterator) Close() error { return itr.input.Close() }
func (itr *integerAuxIterator) Next() *IntegerPoint { return <-itr.output }
func (itr *integerAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *integerAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *integerAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
return nil, nil, errors.New("not implemented")
}
func (itr *integerAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return nil, errors.New("not implemented")
}
func (itr *integerAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// integerChanIterator represents a new instance of integerChanIterator.
type integerChanIterator struct {
c chan *IntegerPoint
once sync.Once
}
func (itr *integerChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *integerChanIterator) Next() *IntegerPoint { return <-itr.c }
// integerReduceIterator executes a reducer for every interval and buffers the result.
type integerReduceIterator struct {
input *bufIntegerIterator
fn integerReduceFunc
opt IteratorOptions
points []*IntegerPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceIterator) Next() *IntegerPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceIterator) reduce() []*IntegerPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*IntegerPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &IntegerPoint{Name: curr.Name, Tags: tags}
m[id] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
prev.Aggregated++
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*IntegerPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}
// integerReduceFunc is the function called by a IntegerPoint reducer.
type integerReduceFunc func(prev, curr *IntegerPoint, opt *reduceOptions) (t int64, v int64, aux []interface{})
// integerReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type integerReduceSliceIterator struct {
input *bufIntegerIterator
fn integerReduceSliceFunc
opt IteratorOptions
points []IntegerPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerReduceSliceIterator) Next() *IntegerPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *integerReduceSliceIterator) reduce() []IntegerPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []IntegerPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
} else if p.Nil {
continue
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := p.Name + "\x00" + tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]IntegerPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]IntegerPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// integerReduceSliceFunc is the function called by a IntegerPoint slice reducer.
type integerReduceSliceFunc func(a []IntegerPoint, opt *reduceOptions) []IntegerPoint
// integerReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type integerTransformIterator struct {
input IntegerIterator
fn integerTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *integerTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerTransformIterator) Next() *IntegerPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// integerTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type integerTransformFunc func(p *IntegerPoint) *IntegerPoint
// integerReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type integerBoolTransformIterator struct {
input IntegerIterator
fn integerBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *integerBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// integerBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type integerBoolTransformFunc func(p *IntegerPoint) *BooleanPoint
// integerDedupeIterator only outputs unique points.
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type integerDedupeIterator struct {
input IntegerIterator
m map[string]struct{} // lookup of points already sent
}
// newIntegerDedupeIterator returns a new instance of integerDedupeIterator.
func newIntegerDedupeIterator(input IntegerIterator) *integerDedupeIterator {
return &integerDedupeIterator{
input: input,
m: make(map[string]struct{}),
}
}
// Close closes the iterator and all child iterators.
func (itr *integerDedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *integerDedupeIterator) Next() *IntegerPoint {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encodeIntegerPoint(p))
if err != nil {
log.Println("error marshaling dedupe point:", err)
continue
}
// If the point has already been output then move to the next point.
if _, ok := itr.m[string(buf)]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[string(buf)] = struct{}{}
return p
}
}
// integerReaderIterator represents an iterator that streams from a reader.
type integerReaderIterator struct {
r io.Reader
dec *IntegerPointDecoder
first *IntegerPoint
}
// newIntegerReaderIterator returns a new instance of integerReaderIterator.
func newIntegerReaderIterator(r io.Reader, first *IntegerPoint) *integerReaderIterator {
return &integerReaderIterator{
r: r,
dec: NewIntegerPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *integerReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *integerReaderIterator) Next() *IntegerPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &IntegerPoint{}
if err := itr.dec.DecodeIntegerPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// integerMeanIterator returns the mean of values for every interval.
type integerMeanIterator struct {
input *bufIntegerIterator
opt IteratorOptions
points []*FloatPoint
}
// Close closes the iterator and all child iterators.
func (itr *integerMeanIterator) Close() error { return itr.input.Close() }
// Next returns the next value.
func (itr *integerMeanIterator) Next() *FloatPoint {
// Calculate the next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.next()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
func (itr *integerMeanIterator) next() []*FloatPoint {
startTime, endTime := itr.opt.Window(itr.input.peekTime())
// Create points by tags.
m := make(map[string]struct {
Name string
Tags Tags
Sum int64
Count uint32
})
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
prev, ok := m[id]
if !ok {
prev.Name = curr.Name
prev.Tags = tags
}
if curr.Aggregated != 0 {
prev.Sum += curr.Value * int64(curr.Aggregated)
prev.Count += curr.Aggregated
} else {
prev.Sum += curr.Value
prev.Count++
}
m[id] = prev
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*FloatPoint, len(m))
for i, k := range keys {
val := m[k]
a[i] = &FloatPoint{
Name: val.Name,
Tags: val.Tags,
Value: float64(val.Sum) / float64(val.Count),
Aggregated: val.Count,
Time: startTime,
}
}
return a
}
// StringIterator represents a stream of string points.
type StringIterator interface {
Iterator
Next() *StringPoint
}
// newStringIterators converts a slice of Iterator to a slice of StringIterator.
// Drop and closes any iterator in itrs that is not a StringIterator and cannot
// be cast to a StringIterator.
func newStringIterators(itrs []Iterator) []StringIterator {
a := make([]StringIterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case StringIterator:
a = append(a, itr)
default:
itr.Close()
}
}
return a
}
// bufStringIterator represents a buffered StringIterator.
type bufStringIterator struct {
itr StringIterator
buf *StringPoint
}
// newBufStringIterator returns a buffered StringIterator.
func newBufStringIterator(itr StringIterator) *bufStringIterator {
return &bufStringIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufStringIterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *bufStringIterator) peek() *StringPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *bufStringIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
return ZeroTime
}
return p.Time
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufStringIterator) Next() *StringPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufStringIterator) NextInWindow(startTime, endTime int64) *StringPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufStringIterator) unread(v *StringPoint) { itr.buf = v }
// stringMergeIterator represents an iterator that combines multiple string iterators.
type stringMergeIterator struct {
inputs []StringIterator
heap *stringMergeHeap
// Current iterator and window.
curr *stringMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
}
// newStringMergeIterator returns a new instance of stringMergeIterator.
func newStringMergeIterator(inputs []StringIterator, opt IteratorOptions) *stringMergeIterator {
itr := &stringMergeIterator{
inputs: inputs,
heap: &stringMergeHeap{
items: make([]*stringMergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufStringIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &stringMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *stringMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *stringMergeIterator) Next() *StringPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*stringMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
if p == nil {
itr.curr = nil
continue
}
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
inWindow = false
}
// If it's outside our window then push iterator back on the heap and find new iterator.
if !inWindow {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
}
return p
}
}
// stringMergeHeap represents a heap of stringMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type stringMergeHeap struct {
opt IteratorOptions
items []*stringMergeHeapItem
}
func (h stringMergeHeap) Len() int { return len(h.items) }
func (h stringMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h stringMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
return xt < yt
}
return xt > yt
}
func (h *stringMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*stringMergeHeapItem))
}
func (h *stringMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
}
type stringMergeHeapItem struct {
itr *bufStringIterator
}
// stringSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type stringSortedMergeIterator struct {
inputs []StringIterator
opt IteratorOptions
heap stringSortedMergeHeap
}
// newStringSortedMergeIterator returns an instance of stringSortedMergeIterator.
func newStringSortedMergeIterator(inputs []StringIterator, opt IteratorOptions) Iterator {
itr := &stringSortedMergeIterator{
inputs: inputs,
heap: make(stringSortedMergeHeap, 0, len(inputs)),
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
itr.heap = append(itr.heap, &stringSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *stringSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next points from the iterator.
func (itr *stringSortedMergeIterator) Next() *StringPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *stringSortedMergeIterator) pop() *StringPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*stringSortedMergeHeapItem)
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
// stringSortedMergeHeap represents a heap of stringSortedMergeHeapItems.
type stringSortedMergeHeap []*stringSortedMergeHeapItem
func (h stringSortedMergeHeap) Len() int { return len(h) }
func (h stringSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h stringSortedMergeHeap) Less(i, j int) bool {
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
func (h *stringSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*stringSortedMergeHeapItem))
}
func (h *stringSortedMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
type stringSortedMergeHeapItem struct {
point *StringPoint
itr StringIterator
ascending bool
}
// stringLimitIterator represents an iterator that limits points per group.
type stringLimitIterator struct {
input StringIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newStringLimitIterator returns a new instance of stringLimitIterator.
func newStringLimitIterator(input StringIterator, opt IteratorOptions) *stringLimitIterator {
return &stringLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *stringLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *stringLimitIterator) Next() *StringPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
// If there's no interval and no groups then simply exit.
if itr.opt.Interval.IsZero() && len(itr.opt.Dimensions) == 0 {
return nil
}
continue
}
return p
}
}
type stringFillIterator struct {
input *bufStringIterator
prev *StringPoint
startTime int64
endTime int64
auxFields []interface{}
done bool
opt IteratorOptions
window struct {
name string
tags Tags
time int64
}
}
func newStringFillIterator(input StringIterator, expr Expr, opt IteratorOptions) *stringFillIterator {
if opt.Fill == NullFill {
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
opt.Fill = NumberFill
opt.FillValue = ""
}
}
var startTime, endTime int64
if opt.Ascending {
startTime, _ = opt.Window(opt.StartTime)
_, endTime = opt.Window(opt.EndTime)
} else {
_, startTime = opt.Window(opt.EndTime)
endTime, _ = opt.Window(opt.StartTime)
}
var auxFields []interface{}
if len(opt.Aux) > 0 {
auxFields = make([]interface{}, len(opt.Aux))
}
itr := &stringFillIterator{
input: newBufStringIterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
}
p := itr.input.peek()
if p != nil {
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
} else {
itr.window.time = itr.endTime
}
return itr
}
func (itr *stringFillIterator) Close() error { return itr.input.Close() }
func (itr *stringFillIterator) Next() *StringPoint {
p := itr.input.Next()
// Check if the next point is outside of our window or is nil.
for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
// If we are inside of an interval, unread the point and continue below to
// constructing a new point.
if itr.opt.Ascending {
if itr.window.time < itr.endTime {
itr.input.unread(p)
p = nil
break
}
} else {
if itr.window.time >= itr.endTime {
itr.input.unread(p)
p = nil
break
}
}
// We are *not* in a current interval. If there is no next point,
// we are at the end of all intervals.
if p == nil {
return nil
}
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
itr.prev = nil
break
}
// Check if the point is our next expected point.
if p == nil || p.Time > itr.window.time {
if p != nil {
itr.input.unread(p)
}
p = &StringPoint{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
Aux: itr.auxFields,
}
switch itr.opt.Fill {
case NullFill:
p.Nil = true
case NumberFill:
p.Value = castToString(itr.opt.FillValue)
case PreviousFill:
if itr.prev != nil {
p.Value = itr.prev.Value
p.Nil = itr.prev.Nil
} else {
p.Nil = true
}
}
} else {
itr.prev = p
}
// Advance the expected time. Do not advance to a new window here
// as there may be lingering points with the same timestamp in the previous
// window.
if itr.opt.Ascending {
itr.window.time = p.Time + int64(itr.opt.Interval.Duration)
} else {
itr.window.time = p.Time - int64(itr.opt.Interval.Duration)
}
return p
}
// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
output chan *StringPoint
fields auxIteratorFields
}
func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator {
return &stringAuxIterator{
input: newBufStringIterator(input),
output: make(chan *StringPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
}
func (itr *stringAuxIterator) Start() { go itr.stream() }
func (itr *stringAuxIterator) Close() error { return itr.input.Close() }
func (itr *stringAuxIterator) Next() *StringPoint { return <-itr.output }
func (itr *stringAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *stringAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *stringAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
return nil, nil, errors.New("not implemented")
}
func (itr *stringAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return nil, errors.New("not implemented")
}
func (itr *stringAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// stringChanIterator represents a new instance of stringChanIterator.
type stringChanIterator struct {
c chan *StringPoint
once sync.Once
}
func (itr *stringChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *stringChanIterator) Next() *StringPoint { return <-itr.c }
// stringReduceIterator executes a reducer for every interval and buffers the result.
type stringReduceIterator struct {
input *bufStringIterator
fn stringReduceFunc
opt IteratorOptions
points []*StringPoint
}
// Close closes the iterator and all child iterators.
func (itr *stringReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringReduceIterator) Next() *StringPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *stringReduceIterator) reduce() []*StringPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*StringPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &StringPoint{Name: curr.Name, Tags: tags}
m[id] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
prev.Aggregated++
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*StringPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}
// stringReduceFunc is the function called by a StringPoint reducer.
type stringReduceFunc func(prev, curr *StringPoint, opt *reduceOptions) (t int64, v string, aux []interface{})
// stringReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type stringReduceSliceIterator struct {
input *bufStringIterator
fn stringReduceSliceFunc
opt IteratorOptions
points []StringPoint
}
// Close closes the iterator and all child iterators.
func (itr *stringReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringReduceSliceIterator) Next() *StringPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *stringReduceSliceIterator) reduce() []StringPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []StringPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
} else if p.Nil {
continue
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := p.Name + "\x00" + tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]StringPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]StringPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// stringReduceSliceFunc is the function called by a StringPoint slice reducer.
type stringReduceSliceFunc func(a []StringPoint, opt *reduceOptions) []StringPoint
// stringReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type stringTransformIterator struct {
input StringIterator
fn stringTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *stringTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringTransformIterator) Next() *StringPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// stringTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type stringTransformFunc func(p *StringPoint) *StringPoint
// stringReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type stringBoolTransformIterator struct {
input StringIterator
fn stringBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *stringBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *stringBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// stringBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type stringBoolTransformFunc func(p *StringPoint) *BooleanPoint
// stringDedupeIterator only outputs unique points.
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type stringDedupeIterator struct {
input StringIterator
m map[string]struct{} // lookup of points already sent
}
// newStringDedupeIterator returns a new instance of stringDedupeIterator.
func newStringDedupeIterator(input StringIterator) *stringDedupeIterator {
return &stringDedupeIterator{
input: input,
m: make(map[string]struct{}),
}
}
// Close closes the iterator and all child iterators.
func (itr *stringDedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *stringDedupeIterator) Next() *StringPoint {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encodeStringPoint(p))
if err != nil {
log.Println("error marshaling dedupe point:", err)
continue
}
// If the point has already been output then move to the next point.
if _, ok := itr.m[string(buf)]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[string(buf)] = struct{}{}
return p
}
}
// stringReaderIterator represents an iterator that streams from a reader.
type stringReaderIterator struct {
r io.Reader
dec *StringPointDecoder
first *StringPoint
}
// newStringReaderIterator returns a new instance of stringReaderIterator.
func newStringReaderIterator(r io.Reader, first *StringPoint) *stringReaderIterator {
return &stringReaderIterator{
r: r,
dec: NewStringPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *stringReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *stringReaderIterator) Next() *StringPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &StringPoint{}
if err := itr.dec.DecodeStringPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// BooleanIterator represents a stream of boolean points.
type BooleanIterator interface {
Iterator
Next() *BooleanPoint
}
// newBooleanIterators converts a slice of Iterator to a slice of BooleanIterator.
// Drop and closes any iterator in itrs that is not a BooleanIterator and cannot
// be cast to a BooleanIterator.
func newBooleanIterators(itrs []Iterator) []BooleanIterator {
a := make([]BooleanIterator, 0, len(itrs))
for _, itr := range itrs {
switch itr := itr.(type) {
case BooleanIterator:
a = append(a, itr)
default:
itr.Close()
}
}
return a
}
// bufBooleanIterator represents a buffered BooleanIterator.
type bufBooleanIterator struct {
itr BooleanIterator
buf *BooleanPoint
}
// newBufBooleanIterator returns a buffered BooleanIterator.
func newBufBooleanIterator(itr BooleanIterator) *bufBooleanIterator {
return &bufBooleanIterator{
itr: itr,
}
}
// Close closes the underlying iterator.
func (itr *bufBooleanIterator) Close() error { return itr.itr.Close() }
// peek returns the next point without removing it from the iterator.
func (itr *bufBooleanIterator) peek() *BooleanPoint {
p := itr.Next()
itr.unread(p)
return p
}
// peekTime returns the time of the next point.
// Returns zero time if no more points available.
func (itr *bufBooleanIterator) peekTime() int64 {
p := itr.peek()
if p == nil {
return ZeroTime
}
return p.Time
}
// Next returns the current buffer, if exists, or calls the underlying iterator.
func (itr *bufBooleanIterator) Next() *BooleanPoint {
if itr.buf != nil {
buf := itr.buf
itr.buf = nil
return buf
}
return itr.itr.Next()
}
// NextInWindow returns the next value if it is between [startTime, endTime).
// If the next value is outside the range then it is moved to the buffer.
func (itr *bufBooleanIterator) NextInWindow(startTime, endTime int64) *BooleanPoint {
v := itr.Next()
if v == nil {
return nil
} else if v.Time < startTime || v.Time >= endTime {
itr.unread(v)
return nil
}
return v
}
// unread sets v to the buffer. It is read on the next call to Next().
func (itr *bufBooleanIterator) unread(v *BooleanPoint) { itr.buf = v }
// booleanMergeIterator represents an iterator that combines multiple boolean iterators.
type booleanMergeIterator struct {
inputs []BooleanIterator
heap *booleanMergeHeap
// Current iterator and window.
curr *booleanMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
}
// newBooleanMergeIterator returns a new instance of booleanMergeIterator.
func newBooleanMergeIterator(inputs []BooleanIterator, opt IteratorOptions) *booleanMergeIterator {
itr := &booleanMergeIterator{
inputs: inputs,
heap: &booleanMergeHeap{
items: make([]*booleanMergeHeapItem, 0, len(inputs)),
opt: opt,
},
}
// Initialize heap items.
for _, input := range inputs {
// Wrap in buffer, ignore any inputs without anymore points.
bufInput := newBufBooleanIterator(input)
if bufInput.peek() == nil {
continue
}
// Append to the heap.
itr.heap.items = append(itr.heap.items, &booleanMergeHeapItem{itr: bufInput})
}
heap.Init(itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *booleanMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *booleanMergeIterator) Next() *BooleanPoint {
for {
// Retrieve the next iterator if we don't have one.
if itr.curr == nil {
if len(itr.heap.items) == 0 {
return nil
}
itr.curr = heap.Pop(itr.heap).(*booleanMergeHeapItem)
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
// Read the next point from the current iterator.
p := itr.curr.itr.Next()
// If there are no more points then remove iterator from heap and find next.
if p == nil {
itr.curr = nil
continue
}
// Check if the point is inside of our current window.
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
inWindow = false
}
// If it's outside our window then push iterator back on the heap and find new iterator.
if !inWindow {
itr.curr.itr.unread(p)
heap.Push(itr.heap, itr.curr)
itr.curr = nil
continue
}
return p
}
}
// booleanMergeHeap represents a heap of booleanMergeHeapItems.
// Items are sorted by their next window and then by name/tags.
type booleanMergeHeap struct {
opt IteratorOptions
items []*booleanMergeHeapItem
}
func (h booleanMergeHeap) Len() int { return len(h.items) }
func (h booleanMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h booleanMergeHeap) Less(i, j int) bool {
x, y := h.items[i].itr.peek(), h.items[j].itr.peek()
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
xt, _ := h.opt.Window(x.Time)
yt, _ := h.opt.Window(y.Time)
if h.opt.Ascending {
return xt < yt
}
return xt > yt
}
func (h *booleanMergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(*booleanMergeHeapItem))
}
func (h *booleanMergeHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
h.items = old[0 : n-1]
return item
}
type booleanMergeHeapItem struct {
itr *bufBooleanIterator
}
// booleanSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type booleanSortedMergeIterator struct {
inputs []BooleanIterator
opt IteratorOptions
heap booleanSortedMergeHeap
}
// newBooleanSortedMergeIterator returns an instance of booleanSortedMergeIterator.
func newBooleanSortedMergeIterator(inputs []BooleanIterator, opt IteratorOptions) Iterator {
itr := &booleanSortedMergeIterator{
inputs: inputs,
heap: make(booleanSortedMergeHeap, 0, len(inputs)),
opt: opt,
}
// Initialize heap.
for _, input := range inputs {
// Read next point.
p := input.Next()
if p == nil {
continue
}
// Append to the heap.
itr.heap = append(itr.heap, &booleanSortedMergeHeapItem{point: p, itr: input, ascending: opt.Ascending})
}
heap.Init(&itr.heap)
return itr
}
// Close closes the underlying iterators.
func (itr *booleanSortedMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
return nil
}
// Next returns the next points from the iterator.
func (itr *booleanSortedMergeIterator) Next() *BooleanPoint { return itr.pop() }
// pop returns the next point from the heap.
// Reads the next point from item's cursor and puts it back on the heap.
func (itr *booleanSortedMergeIterator) pop() *BooleanPoint {
if len(itr.heap) == 0 {
return nil
}
// Read the next item from the heap.
item := heap.Pop(&itr.heap).(*booleanSortedMergeHeapItem)
// Copy the point for return.
p := item.point.Clone()
// Read the next item from the cursor. Push back to heap if one exists.
if item.point = item.itr.Next(); item.point != nil {
heap.Push(&itr.heap, item)
}
return p
}
// booleanSortedMergeHeap represents a heap of booleanSortedMergeHeapItems.
type booleanSortedMergeHeap []*booleanSortedMergeHeapItem
func (h booleanSortedMergeHeap) Len() int { return len(h) }
func (h booleanSortedMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h booleanSortedMergeHeap) Less(i, j int) bool {
x, y := h[i].point, h[j].point
if h[i].ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() < y.Tags.ID()
}
return x.Time < y.Time
}
if x.Name != y.Name {
return x.Name > y.Name
} else if !x.Tags.Equals(&y.Tags) {
return x.Tags.ID() > y.Tags.ID()
}
return x.Time > y.Time
}
func (h *booleanSortedMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*booleanSortedMergeHeapItem))
}
func (h *booleanSortedMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
type booleanSortedMergeHeapItem struct {
point *BooleanPoint
itr BooleanIterator
ascending bool
}
// booleanLimitIterator represents an iterator that limits points per group.
type booleanLimitIterator struct {
input BooleanIterator
opt IteratorOptions
n int
prev struct {
name string
tags Tags
}
}
// newBooleanLimitIterator returns a new instance of booleanLimitIterator.
func newBooleanLimitIterator(input BooleanIterator, opt IteratorOptions) *booleanLimitIterator {
return &booleanLimitIterator{
input: input,
opt: opt,
}
}
// Close closes the underlying iterators.
func (itr *booleanLimitIterator) Close() error { return itr.input.Close() }
// Next returns the next point from the iterator.
func (itr *booleanLimitIterator) Next() *BooleanPoint {
for {
p := itr.input.Next()
if p == nil {
return nil
}
// Reset window and counter if a new window is encountered.
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
itr.prev.name = p.Name
itr.prev.tags = p.Tags
itr.n = 0
}
// Increment counter.
itr.n++
// Read next point if not beyond the offset.
if itr.n <= itr.opt.Offset {
continue
}
// Read next point if we're beyond the limit.
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
// If there's no interval and no groups then simply exit.
if itr.opt.Interval.IsZero() && len(itr.opt.Dimensions) == 0 {
return nil
}
continue
}
return p
}
}
type booleanFillIterator struct {
input *bufBooleanIterator
prev *BooleanPoint
startTime int64
endTime int64
auxFields []interface{}
done bool
opt IteratorOptions
window struct {
name string
tags Tags
time int64
}
}
func newBooleanFillIterator(input BooleanIterator, expr Expr, opt IteratorOptions) *booleanFillIterator {
if opt.Fill == NullFill {
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
opt.Fill = NumberFill
opt.FillValue = false
}
}
var startTime, endTime int64
if opt.Ascending {
startTime, _ = opt.Window(opt.StartTime)
_, endTime = opt.Window(opt.EndTime)
} else {
_, startTime = opt.Window(opt.EndTime)
endTime, _ = opt.Window(opt.StartTime)
}
var auxFields []interface{}
if len(opt.Aux) > 0 {
auxFields = make([]interface{}, len(opt.Aux))
}
itr := &booleanFillIterator{
input: newBufBooleanIterator(input),
startTime: startTime,
endTime: endTime,
auxFields: auxFields,
opt: opt,
}
p := itr.input.peek()
if p != nil {
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
} else {
itr.window.time = itr.endTime
}
return itr
}
func (itr *booleanFillIterator) Close() error { return itr.input.Close() }
func (itr *booleanFillIterator) Next() *BooleanPoint {
p := itr.input.Next()
// Check if the next point is outside of our window or is nil.
for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
// If we are inside of an interval, unread the point and continue below to
// constructing a new point.
if itr.opt.Ascending {
if itr.window.time < itr.endTime {
itr.input.unread(p)
p = nil
break
}
} else {
if itr.window.time >= itr.endTime {
itr.input.unread(p)
p = nil
break
}
}
// We are *not* in a current interval. If there is no next point,
// we are at the end of all intervals.
if p == nil {
return nil
}
// Set the new interval.
itr.window.name, itr.window.tags = p.Name, p.Tags
itr.window.time = itr.startTime
itr.prev = nil
break
}
// Check if the point is our next expected point.
if p == nil || p.Time > itr.window.time {
if p != nil {
itr.input.unread(p)
}
p = &BooleanPoint{
Name: itr.window.name,
Tags: itr.window.tags,
Time: itr.window.time,
Aux: itr.auxFields,
}
switch itr.opt.Fill {
case NullFill:
p.Nil = true
case NumberFill:
p.Value = castToBoolean(itr.opt.FillValue)
case PreviousFill:
if itr.prev != nil {
p.Value = itr.prev.Value
p.Nil = itr.prev.Nil
} else {
p.Nil = true
}
}
} else {
itr.prev = p
}
// Advance the expected time. Do not advance to a new window here
// as there may be lingering points with the same timestamp in the previous
// window.
if itr.opt.Ascending {
itr.window.time = p.Time + int64(itr.opt.Interval.Duration)
} else {
itr.window.time = p.Time - int64(itr.opt.Interval.Duration)
}
return p
}
// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator
output chan *BooleanPoint
fields auxIteratorFields
}
func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator {
return &booleanAuxIterator{
input: newBufBooleanIterator(input),
output: make(chan *BooleanPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
}
func (itr *booleanAuxIterator) Start() { go itr.stream() }
func (itr *booleanAuxIterator) Close() error { return itr.input.Close() }
func (itr *booleanAuxIterator) Next() *BooleanPoint { return <-itr.output }
func (itr *booleanAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
func (itr *booleanAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
expr := opt.Expr
if expr == nil {
panic("unable to create an iterator with no expression from an aux iterator")
}
switch expr := expr.(type) {
case *VarRef:
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
}
func (itr *booleanAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
return nil, nil, errors.New("not implemented")
}
func (itr *booleanAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return nil, errors.New("not implemented")
}
func (itr *booleanAuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
}
close(itr.output)
itr.fields.close()
}
// booleanChanIterator represents a new instance of booleanChanIterator.
type booleanChanIterator struct {
c chan *BooleanPoint
once sync.Once
}
func (itr *booleanChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
return nil
}
func (itr *booleanChanIterator) Next() *BooleanPoint { return <-itr.c }
// booleanReduceIterator executes a reducer for every interval and buffers the result.
type booleanReduceIterator struct {
input *bufBooleanIterator
fn booleanReduceFunc
opt IteratorOptions
points []*BooleanPoint
}
// Close closes the iterator and all child iterators.
func (itr *booleanReduceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanReduceIterator) Next() *BooleanPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *booleanReduceIterator) reduce() []*BooleanPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Create points by tags.
m := make(map[string]*BooleanPoint)
for {
// Read next point.
curr := itr.input.NextInWindow(startTime, endTime)
if curr == nil {
break
} else if curr.Nil {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
// Pass previous and current points to reducer.
prev := m[id]
t, v, aux := itr.fn(prev, curr, &reduceOptions)
if t == ZeroTime {
continue
}
// If previous value didn't exist, create it and copy values.
if prev == nil {
prev = &BooleanPoint{Name: curr.Name, Tags: tags}
m[id] = prev
}
prev.Time = t
prev.Value = v
prev.Aux = aux
prev.Aggregated++
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
a := make([]*BooleanPoint, len(m))
for i, k := range keys {
a[i] = m[k]
}
// Set the time on each point to the beginning of the interval.
for _, p := range a {
p.Time = startTime
}
return a
}
// booleanReduceFunc is the function called by a BooleanPoint reducer.
type booleanReduceFunc func(prev, curr *BooleanPoint, opt *reduceOptions) (t int64, v bool, aux []interface{})
// booleanReduceSliceIterator executes a reducer on all points in a window and buffers the result.
type booleanReduceSliceIterator struct {
input *bufBooleanIterator
fn booleanReduceSliceFunc
opt IteratorOptions
points []BooleanPoint
}
// Close closes the iterator and all child iterators.
func (itr *booleanReduceSliceIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanReduceSliceIterator) Next() *BooleanPoint {
// Calculate next window if we have no more points.
if len(itr.points) == 0 {
itr.points = itr.reduce()
if len(itr.points) == 0 {
return nil
}
}
// Pop next point off the stack.
p := itr.points[len(itr.points)-1]
itr.points = itr.points[:len(itr.points)-1]
return &p
}
// reduce executes fn once for every point in the next window.
// The previous value for the dimension is passed to fn.
func (itr *booleanReduceSliceIterator) reduce() []BooleanPoint {
// Calculate next window.
startTime, endTime := itr.opt.Window(itr.input.peekTime())
var reduceOptions = reduceOptions{
startTime: startTime,
endTime: endTime,
}
// Group points by name and tagset.
groups := make(map[string]struct {
name string
tags Tags
points []BooleanPoint
})
for {
// Read next point.
p := itr.input.NextInWindow(startTime, endTime)
if p == nil {
break
} else if p.Nil {
continue
}
tags := p.Tags.Subset(itr.opt.Dimensions)
// Append point to dimension.
id := p.Name + "\x00" + tags.ID()
g := groups[id]
g.name = p.Name
g.tags = tags
g.points = append(g.points, *p)
groups[id] = g
}
// Reduce each set into a set of values.
results := make(map[string][]BooleanPoint)
for key, g := range groups {
a := itr.fn(g.points, &reduceOptions)
if len(a) == 0 {
continue
}
// Update name and tags for each returned point.
for i := range a {
a[i].Name = g.name
a[i].Tags = g.tags
}
results[key] = a
}
// Reverse sort points by name & tag.
keys := make([]string, 0, len(results))
for k := range results {
keys = append(keys, k)
}
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
// Reverse order points within each key.
a := make([]BooleanPoint, 0, len(results))
for _, k := range keys {
for i := len(results[k]) - 1; i >= 0; i-- {
a = append(a, results[k][i])
}
}
return a
}
// booleanReduceSliceFunc is the function called by a BooleanPoint slice reducer.
type booleanReduceSliceFunc func(a []BooleanPoint, opt *reduceOptions) []BooleanPoint
// booleanReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type booleanTransformIterator struct {
input BooleanIterator
fn booleanTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *booleanTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
p = itr.fn(p)
}
return p
}
// booleanTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type booleanTransformFunc func(p *BooleanPoint) *BooleanPoint
// booleanReduceIterator executes a function to modify an existing point for every
// output of the input iterator.
type booleanBoolTransformIterator struct {
input BooleanIterator
fn booleanBoolTransformFunc
}
// Close closes the iterator and all child iterators.
func (itr *booleanBoolTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *booleanBoolTransformIterator) Next() *BooleanPoint {
p := itr.input.Next()
if p != nil {
return itr.fn(p)
}
return nil
}
// booleanBoolTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type booleanBoolTransformFunc func(p *BooleanPoint) *BooleanPoint
// booleanDedupeIterator only outputs unique points.
// This differs from the DistinctIterator in that it compares all aux fields too.
// This iterator is relatively inefficient and should only be used on small
// datasets such as meta query results.
type booleanDedupeIterator struct {
input BooleanIterator
m map[string]struct{} // lookup of points already sent
}
// newBooleanDedupeIterator returns a new instance of booleanDedupeIterator.
func newBooleanDedupeIterator(input BooleanIterator) *booleanDedupeIterator {
return &booleanDedupeIterator{
input: input,
m: make(map[string]struct{}),
}
}
// Close closes the iterator and all child iterators.
func (itr *booleanDedupeIterator) Close() error { return itr.input.Close() }
// Next returns the next unique point from the input iterator.
func (itr *booleanDedupeIterator) Next() *BooleanPoint {
for {
// Read next point.
p := itr.input.Next()
if p == nil {
return nil
}
// Serialize to bytes to store in lookup.
buf, err := proto.Marshal(encodeBooleanPoint(p))
if err != nil {
log.Println("error marshaling dedupe point:", err)
continue
}
// If the point has already been output then move to the next point.
if _, ok := itr.m[string(buf)]; ok {
continue
}
// Otherwise mark it as emitted and return point.
itr.m[string(buf)] = struct{}{}
return p
}
}
// booleanReaderIterator represents an iterator that streams from a reader.
type booleanReaderIterator struct {
r io.Reader
dec *BooleanPointDecoder
first *BooleanPoint
}
// newBooleanReaderIterator returns a new instance of booleanReaderIterator.
func newBooleanReaderIterator(r io.Reader, first *BooleanPoint) *booleanReaderIterator {
return &booleanReaderIterator{
r: r,
dec: NewBooleanPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *booleanReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *booleanReaderIterator) Next() *BooleanPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &BooleanPoint{}
if err := itr.dec.DecodeBooleanPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// IteratorEncoder is an encoder for encoding an iterator's points to w.
type IteratorEncoder struct {
w io.Writer
}
// NewIteratorEncoder encodes an iterator's points to w.
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
return &IteratorEncoder{w: w}
}
// Encode encodes and writes all of itr's points to the underlying writer.
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
switch itr := itr.(type) {
case FloatIterator:
return enc.encodeFloatIterator(itr)
case IntegerIterator:
return enc.encodeIntegerIterator(itr)
case StringIterator:
return enc.encodeStringIterator(itr)
case BooleanIterator:
return enc.encodeBooleanIterator(itr)
default:
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
}
}
// encodeFloatIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeFloatIterator(itr FloatIterator) error {
penc := NewFloatPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeFloatPoint(p); err != nil {
return err
}
}
}
// encodeIntegerIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeIntegerIterator(itr IntegerIterator) error {
penc := NewIntegerPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeIntegerPoint(p); err != nil {
return err
}
}
}
// encodeStringIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeStringIterator(itr StringIterator) error {
penc := NewStringPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeStringPoint(p); err != nil {
return err
}
}
}
// encodeBooleanIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeBooleanIterator(itr BooleanIterator) error {
penc := NewBooleanPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeBooleanPoint(p); err != nil {
return err
}
}
}