//lint:file-ignore U1000 this is generated code package query import ( "context" "container/heap" "io" "sort" "strings" "sync" "time" "github.com/gogo/protobuf/proto" "github.com/influxdata/influxql" ) // DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval. const DefaultStatsInterval = time.Second {{with $types := .}}{{range $k := $types}} // {{$k.Name}}Iterator represents a stream of {{$k.name}} points. type {{$k.Name}}Iterator interface { Iterator Next() (*{{$k.Name}}Point, error) } // new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator. // Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot // be cast to a {{$k.Name}}Iterator. func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator { a := make([]{{$k.Name}}Iterator, 0, len(itrs)) for _, itr := range itrs { switch itr := itr.(type) { case {{$k.Name}}Iterator: a = append(a, itr) default: itr.Close() } } return a } // buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator. type buf{{$k.Name}}Iterator struct { itr {{$k.Name}}Iterator buf *{{$k.Name}}Point } // newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator. func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator { return &buf{{$k.Name}}Iterator{itr: itr} } // Stats returns statistics from the input iterator. func (itr *buf{{$k.Name}}Iterator) Stats() IteratorStats { return itr.itr.Stats() } // Close closes the underlying iterator. func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() } // peek returns the next point without removing it from the iterator. func (itr *buf{{$k.Name}}Iterator) peek() (*{{$k.Name}}Point, error) { p, err := itr.Next() if err != nil { return nil, err } itr.unread(p) return p, nil } // peekTime returns the time of the next point. // Returns zero time if no more points available. func (itr *buf{{$k.Name}}Iterator) peekTime() (int64, error) { p, err := itr.peek() if p == nil || err != nil { return ZeroTime, err } return p.Time, nil } // Next returns the current buffer, if exists, or calls the underlying iterator. func (itr *buf{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) { buf := itr.buf if buf != nil { itr.buf = nil return buf, nil } return itr.itr.Next() } // NextInWindow returns the next value if it is between [startTime, endTime). // If the next value is outside the range then it is moved to the buffer. func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) (*{{$k.Name}}Point, error) { v, err := itr.Next() if v == nil || err != nil { return nil, err } else if t := v.Time; t >= endTime || t < startTime { itr.unread(v) return nil, nil } return v, nil } // unread sets v to the buffer. It is read on the next call to Next(). func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v } // {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators. type {{$k.name}}MergeIterator struct { inputs []{{$k.Name}}Iterator heap *{{$k.name}}MergeHeap init bool closed bool mu sync.RWMutex // Current iterator and window. curr *{{$k.name}}MergeHeapItem window struct { name string tags string startTime int64 endTime int64 } } // new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator. func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator { itr := &{{$k.name}}MergeIterator{ inputs: inputs, heap: &{{$k.name}}MergeHeap{ items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)), opt: opt, }, } // Initialize heap items. for _, input := range inputs { // Wrap in buffer, ignore any inputs without anymore points. bufInput := newBuf{{$k.Name}}Iterator(input) // Append to the heap. itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput}) } return itr } // Stats returns an aggregation of stats from the underlying iterators. func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats { var stats IteratorStats for _, input := range itr.inputs { stats.Add(input.Stats()) } return stats } // Close closes the underlying iterators. func (itr *{{$k.name}}MergeIterator) Close() error { itr.mu.Lock() defer itr.mu.Unlock() for _, input := range itr.inputs { input.Close() } itr.curr = nil itr.inputs = nil itr.heap.items = nil itr.closed = true return nil } // Next returns the next point from the iterator. func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) { itr.mu.RLock() defer itr.mu.RUnlock() if itr.closed { return nil, nil } // Initialize the heap. This needs to be done lazily on the first call to this iterator // so that iterator initialization done through the Select() call returns quickly. // Queries can only be interrupted after the Select() call completes so any operations // done during iterator creation cannot be interrupted, which is why we do it here // instead so an interrupt can happen while initializing the heap. if !itr.init { items := itr.heap.items itr.heap.items = make([]*{{$k.name}}MergeHeapItem, 0, len(items)) for _, item := range items { if p, err := item.itr.peek(); err != nil { return nil, err } else if p == nil { continue } itr.heap.items = append(itr.heap.items, item) } heap.Init(itr.heap) itr.init = true } for { // Retrieve the next iterator if we don't have one. if itr.curr == nil { if len(itr.heap.items) == 0 { return nil, nil } itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem) // Read point and set current window. p, err := itr.curr.itr.Next() if err != nil { return nil, err } tags := p.Tags.Subset(itr.heap.opt.Dimensions) itr.window.name, itr.window.tags = p.Name, tags.ID() itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time) return p, nil } // Read the next point from the current iterator. p, err := itr.curr.itr.Next() if err != nil { return nil, err } // If there are no more points then remove iterator from heap and find next. if p == nil { itr.curr = nil continue } // Check if the point is inside of our current window. inWindow := true if window := itr.window; window.name != p.Name { inWindow = false } else if tags := p.Tags.Subset(itr.heap.opt.Dimensions); window.tags != tags.ID() { inWindow = false } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime { inWindow = false } else if !opt.Ascending && p.Time < window.startTime { inWindow = false } // If it's outside our window then push iterator back on the heap and find new iterator. if !inWindow { itr.curr.itr.unread(p) heap.Push(itr.heap, itr.curr) itr.curr = nil continue } return p, nil } } // {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems. // Items are sorted by their next window and then by name/tags. type {{$k.name}}MergeHeap struct { opt IteratorOptions items []*{{$k.name}}MergeHeapItem } func (h *{{$k.name}}MergeHeap) Len() int { return len(h.items) } func (h *{{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] } func (h *{{$k.name}}MergeHeap) Less(i, j int) bool { x, err := h.items[i].itr.peek() if err != nil { return true } y, err := h.items[j].itr.peek() if err != nil { return false } if h.opt.Ascending { if x.Name != y.Name { return x.Name < y.Name } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() { return xTags.ID() < yTags.ID() } } else { if x.Name != y.Name { return x.Name > y.Name } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() { return xTags.ID() > yTags.ID() } } xt, _ := h.opt.Window(x.Time) yt, _ := h.opt.Window(y.Time) if h.opt.Ascending { return xt < yt } return xt > yt } func (h *{{$k.name}}MergeHeap) Push(x interface{}) { h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem)) } func (h *{{$k.name}}MergeHeap) Pop() interface{} { old := h.items n := len(old) item := old[n-1] h.items = old[0 : n-1] return item } type {{$k.name}}MergeHeapItem struct { itr *buf{{$k.Name}}Iterator } // {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one. type {{$k.name}}SortedMergeIterator struct { inputs []{{$k.Name}}Iterator heap *{{$k.name}}SortedMergeHeap init bool } // new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator. func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator { itr := &{{$k.name}}SortedMergeIterator{ inputs: inputs, heap: &{{$k.name}}SortedMergeHeap{ items: make([]*{{$k.name}}SortedMergeHeapItem, 0, len(inputs)), opt: opt, }, } // Initialize heap items. for _, input := range inputs { // Append to the heap. itr.heap.items = append(itr.heap.items, &{{$k.name}}SortedMergeHeapItem{itr: input}) } return itr } // Stats returns an aggregation of stats from the underlying iterators. func (itr *{{$k.name}}SortedMergeIterator) Stats() IteratorStats { var stats IteratorStats for _, input := range itr.inputs { stats.Add(input.Stats()) } return stats } // Close closes the underlying iterators. func (itr *{{$k.name}}SortedMergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } return nil } // Next returns the next points from the iterator. func (itr *{{$k.name}}SortedMergeIterator) Next() (*{{$k.Name}}Point, error) { return itr.pop() } // pop returns the next point from the heap. // Reads the next point from item's cursor and puts it back on the heap. func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) { // Initialize the heap. See the MergeIterator to see why this has to be done lazily. if !itr.init { items := itr.heap.items itr.heap.items = make([]*{{$k.name}}SortedMergeHeapItem, 0, len(items)) for _, item := range items { var err error if item.point, err = item.itr.Next(); err != nil { return nil, err } else if item.point == nil { continue } itr.heap.items = append(itr.heap.items, item) } itr.heap.detectFast() heap.Init(itr.heap) itr.init = true } if len(itr.heap.items) == 0 { return nil, nil } // Read the next item from the heap. item := heap.Pop(itr.heap).(*{{$k.name}}SortedMergeHeapItem) if item.err != nil { return nil, item.err } else if item.point == nil { return nil, nil } // Copy the point for return. p := item.point.Clone() // Read the next item from the cursor. Push back to heap if one exists. if item.point, item.err = item.itr.Next(); item.point != nil { heap.Push(itr.heap, item) } return p, nil } // {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems. // Items are sorted with the following priority: // - By their measurement name; // - By their tag keys/values; // - By time; or // - By their Aux field values. // type {{$k.name}}SortedMergeHeap struct { opt IteratorOptions items []*{{$k.name}}SortedMergeHeapItem // if each input comes from a unique single time series, we can make a shortcut. // detection of the shortcut introduces some overhead but it gets significant // performance improvement in cases like SELECT * FROM m GROUP BY * fast bool } func (h *{{$k.name}}SortedMergeHeap) detectFast() { for _, item := range h.items { if item.itr.Stats().SeriesN != 1 { return } } hasDup := false s := make([]*{{$k.name}}SortedMergeHeapItem, len(h.items)) copy(s, h.items) less := func(i, j int) bool { x, y := s[i].point, s[j].point ret := strings.Compare(x.Name, y.Name) if ret == 0 { ret = strings.Compare(x.Tags.ID(), y.Tags.ID()) } if ret != 0 { // TT // ret | == -1 | h.opt.Ascending | result // 1 | false | false | true // -1 | true | false | false // 1 | false | true | false // -1 | true | true | true return ret == -1 == h.opt.Ascending } hasDup = true return false } sort.Slice(s, less) if !hasDup { h.fast = true for i, item := range s { item.fastIdx = i } } } func (h *{{$k.name}}SortedMergeHeap) Len() int { return len(h.items) } func (h *{{$k.name}}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] } func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool { if h.fast { return h.items[i].fastIdx < h.items[j].fastIdx } x, y := h.items[i].point, h.items[j].point if h.opt.Ascending { if x.Name != y.Name { return x.Name < y.Name } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) { return xTags.ID() < yTags.ID() } if x.Time != y.Time{ return x.Time < y.Time } if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) { for i := 0; i < len(x.Aux); i++ { v1, ok1 := x.Aux[i].(string) v2, ok2 := y.Aux[i].(string) if !ok1 || !ok2 { // Unsupported types used in Aux fields. Maybe they // need to be added here? return false } else if v1 == v2 { continue } return v1 < v2 } } return false // Times and/or Aux fields are equal. } if x.Name != y.Name { return x.Name > y.Name } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) { return xTags.ID() > yTags.ID() } if x.Time != y.Time{ return x.Time > y.Time } if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) { for i := 0; i < len(x.Aux); i++ { v1, ok1 := x.Aux[i].(string) v2, ok2 := y.Aux[i].(string) if !ok1 || !ok2 { // Unsupported types used in Aux fields. Maybe they // need to be added here? return false } else if v1 == v2 { continue } return v1 > v2 } } return false // Times and/or Aux fields are equal. } func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) { h.items = append(h.items, x.(*{{$k.name}}SortedMergeHeapItem)) } func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} { old := h.items n := len(old) item := old[n-1] h.items = old[0 : n-1] return item } type {{$k.name}}SortedMergeHeapItem struct { point *{{$k.Name}}Point err error itr {{$k.Name}}Iterator // index for fast shortcut fastIdx int } // {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map. type {{$k.name}}IteratorScanner struct { input *buf{{$k.Name}}Iterator err error keys []influxql.VarRef defaultValue interface{} } // new{{$k.Name}}IteratorScanner creates a new IteratorScanner. func new{{$k.Name}}IteratorScanner(input {{$k.Name}}Iterator, keys []influxql.VarRef, defaultValue interface{}) *{{$k.name}}IteratorScanner { return &{{$k.name}}IteratorScanner{ input: newBuf{{$k.Name}}Iterator(input), keys: keys, defaultValue: defaultValue, } } func (s *{{$k.name}}IteratorScanner) Peek() (int64, string, Tags) { if s.err != nil { return ZeroTime, "", Tags{} } p, err := s.input.peek() if err != nil { s.err = err return ZeroTime, "", Tags{} } else if p == nil { return ZeroTime, "", Tags{} } return p.Time, p.Name, p.Tags } func (s *{{$k.name}}IteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { if s.err != nil { return } p, err := s.input.Next() if err != nil { s.err = err return } else if p == nil { s.useDefaults(m) return } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { s.useDefaults(m) s.input.unread(p) return } if k := s.keys[0]; k.Val != "" { if p.Nil { if s.defaultValue != SkipDefault { m[k.Val] = castToType(s.defaultValue, k.Type) } } else { m[k.Val] = p.Value } } for i, v := range p.Aux { k := s.keys[i+1] switch v.(type) { case float64, int64, uint64, string, bool: m[k.Val] = v default: // Insert the fill value if one was specified. if s.defaultValue != SkipDefault { m[k.Val] = castToType(s.defaultValue, k.Type) } } } } func (s *{{$k.name}}IteratorScanner) useDefaults(m map[string]interface{}) { if s.defaultValue == SkipDefault { return } for _, k := range s.keys { if k.Val == "" { continue } m[k.Val] = castToType(s.defaultValue, k.Type) } } func (s *{{$k.name}}IteratorScanner) Stats() IteratorStats { return s.input.Stats() } func (s *{{$k.name}}IteratorScanner) Err() error { return s.err } func (s *{{$k.name}}IteratorScanner) Close() error { return s.input.Close() } // {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine. type {{$k.name}}ParallelIterator struct { input {{$k.Name}}Iterator ch chan {{$k.name}}PointError once sync.Once closing chan struct{} wg sync.WaitGroup } // new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator. func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}ParallelIterator { itr := &{{$k.name}}ParallelIterator{ input: input, ch: make(chan {{$k.name}}PointError, 256), closing: make(chan struct{}), } itr.wg.Add(1) go itr.monitor() return itr } // Stats returns stats from the underlying iterator. func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the underlying iterators. func (itr *{{$k.name}}ParallelIterator) Close() error { itr.once.Do(func() { close(itr.closing) }) itr.wg.Wait() return itr.input.Close() } // Next returns the next point from the iterator. func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) { v, ok := <-itr.ch if !ok { return nil, io.EOF } return v.point, v.err } // monitor runs in a separate goroutine and actively pulls the next point. func (itr *{{$k.name}}ParallelIterator) monitor() { defer close(itr.ch) defer itr.wg.Done() for { // Read next point. p, err := itr.input.Next() if p != nil { p = p.Clone() } select { case <-itr.closing: return case itr.ch <- {{$k.name}}PointError{point: p, err: err}: } } } type {{$k.name}}PointError struct { point *{{$k.Name}}Point err error } // {{$k.name}}LimitIterator represents an iterator that limits points per group. type {{$k.name}}LimitIterator struct { input {{$k.Name}}Iterator opt IteratorOptions n int prev struct { name string tags Tags } } // new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator. func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator { return &{{$k.name}}LimitIterator{ input: input, opt: opt, } } // Stats returns stats from the underlying iterator. func (itr *{{$k.name}}LimitIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the underlying iterators. func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() } // Next returns the next point from the iterator. func (itr *{{$k.name}}LimitIterator) Next() (*{{$k.Name}}Point, error) { for { p, err := itr.input.Next() if p == nil || err != nil { return nil, err } // Reset window and counter if a new window is encountered. if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) { itr.prev.name = p.Name itr.prev.tags = p.Tags itr.n = 0 } // Increment counter. itr.n++ // Read next point if not beyond the offset. if itr.n <= itr.opt.Offset { continue } // Read next point if we're beyond the limit. if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit { continue } return p, nil } } type {{$k.name}}FillIterator struct { input *buf{{$k.Name}}Iterator prev {{$k.Name}}Point startTime int64 endTime int64 auxFields []interface{} init bool opt IteratorOptions window struct { name string tags Tags time int64 offset int64 } } func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr influxql.Expr, opt IteratorOptions) *{{$k.name}}FillIterator { if opt.Fill == influxql.NullFill { if expr, ok := expr.(*influxql.Call); ok && expr.Name == "count" { opt.Fill = influxql.NumberFill opt.FillValue = {{$k.Zero}} } } var startTime, endTime int64 if opt.Ascending { startTime, _ = opt.Window(opt.StartTime) endTime, _ = opt.Window(opt.EndTime) } else { startTime, _ = opt.Window(opt.EndTime) endTime, _ = opt.Window(opt.StartTime) } var auxFields []interface{} if len(opt.Aux) > 0 { auxFields = make([]interface{}, len(opt.Aux)) } return &{{$k.name}}FillIterator{ input: newBuf{{$k.Name}}Iterator(input), prev: {{$k.Name}}Point{Nil: true}, startTime: startTime, endTime: endTime, auxFields: auxFields, opt: opt, } } func (itr *{{$k.name}}FillIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { if !itr.init { p, err := itr.input.peek() if p == nil || err != nil { return nil, err } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime if itr.startTime == influxql.MinTime { itr.window.time, _ = itr.opt.Window(p.Time) } if itr.opt.Location != nil { _, itr.window.offset = itr.opt.Zone(itr.window.time) } itr.init = true } p, err := itr.input.Next() if err != nil { return nil, err } // Check if the next point is outside of our window or is nil. if p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() { // If we are inside of an interval, unread the point and continue below to // constructing a new point. if itr.opt.Ascending && itr.window.time <= itr.endTime { itr.input.unread(p) p = nil goto CONSTRUCT } else if !itr.opt.Ascending && itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime { itr.input.unread(p) p = nil goto CONSTRUCT } // We are *not* in a current interval. If there is no next point, // we are at the end of all intervals. if p == nil { return nil, nil } // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime if itr.window.time == influxql.MinTime { itr.window.time, _ = itr.opt.Window(p.Time) } if itr.opt.Location != nil { _, itr.window.offset = itr.opt.Zone(itr.window.time) } itr.prev = {{$k.Name}}Point{Nil: true} } // Check if the point is our next expected point. CONSTRUCT: if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) { if p != nil { itr.input.unread(p) } p = &{{$k.Name}}Point{ Name: itr.window.name, Tags: itr.window.tags, Time: itr.window.time, Aux: itr.auxFields, } switch itr.opt.Fill { case influxql.LinearFill: {{- if or (eq $k.Name "Float") (eq $k.Name "Integer") (eq $k.Name "Unsigned")}} if !itr.prev.Nil { next, err := itr.input.peek() if err != nil { return nil, err } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) } else { p.Nil = true } } else { p.Nil = true } {{else}} fallthrough {{- end}} case influxql.NullFill: p.Nil = true case influxql.NumberFill: p.Value, _ = castTo{{$k.Name}}(itr.opt.FillValue) case influxql.PreviousFill: if !itr.prev.Nil { p.Value = itr.prev.Value p.Nil = itr.prev.Nil } else { p.Nil = true } } } else { itr.prev = *p } // Advance the expected time. Do not advance to a new window here // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { itr.window.time += int64(itr.opt.Interval.Duration) } else { itr.window.time -= int64(itr.opt.Interval.Duration) } // Check to see if we have passed over an offset change and adjust the time // to account for this new offset. if itr.opt.Location != nil { if _, offset := itr.opt.Zone(itr.window.time - 1); offset != itr.window.offset { diff := itr.window.offset - offset if abs(diff) < int64(itr.opt.Interval.Duration) { itr.window.time += diff } itr.window.offset = offset } } return p, nil } // {{$k.name}}IntervalIterator represents a {{$k.name}} implementation of IntervalIterator. type {{$k.name}}IntervalIterator struct { input {{$k.Name}}Iterator opt IteratorOptions } func new{{$k.Name}}IntervalIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}IntervalIterator { return &{{$k.name}}IntervalIterator{input: input, opt: opt} } func (itr *{{$k.name}}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}IntervalIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) { p, err := itr.input.Next() if p == nil || err != nil { return nil, err } p.Time, _ = itr.opt.Window(p.Time) // If we see the minimum allowable time, set the time to zero so we don't // break the default returned time for aggregate queries without times. if p.Time == influxql.MinTime { p.Time = 0 } return p, nil } // {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator. type {{$k.name}}InterruptIterator struct { input {{$k.Name}}Iterator closing <-chan struct{} count int } func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator { return &{{$k.name}}InterruptIterator{input: input, closing: closing} } func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) { // Only check if the channel is closed every N points. This // intentionally checks on both 0 and N so that if the iterator // has been interrupted before the first point is emitted it will // not emit any points. if itr.count & 0xFF == 0xFF { select { case <-itr.closing: return nil, itr.Close() default: // Reset iterator count to zero and fall through to emit the next point. itr.count = 0 } } // Increment the counter for every point read. itr.count++ return itr.input.Next() } // {{$k.name}}CloseInterruptIterator represents a {{$k.name}} implementation of CloseInterruptIterator. type {{$k.name}}CloseInterruptIterator struct { input {{$k.Name}}Iterator closing <-chan struct{} done chan struct{} once sync.Once } func new{{$k.Name}}CloseInterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}CloseInterruptIterator { itr := &{{$k.name}}CloseInterruptIterator{ input: input, closing: closing, done: make(chan struct{}), } go itr.monitor() return itr } func (itr *{{$k.name}}CloseInterruptIterator) monitor() { select { case <-itr.closing: itr.Close() case <-itr.done: } } func (itr *{{$k.name}}CloseInterruptIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}CloseInterruptIterator) Close() error { itr.once.Do(func() { close(itr.done) itr.input.Close() }) return nil } func (itr *{{$k.name}}CloseInterruptIterator) Next() (*{{$k.Name}}Point, error) { p, err := itr.input.Next() if err != nil { // Check if the iterator was closed. select { case <-itr.done: return nil, nil default: return nil, err } } return p, nil } {{range $v := $types}} // {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result. type {{$k.name}}Reduce{{$v.Name}}Iterator struct { input *buf{{$k.Name}}Iterator create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) dims []string opt IteratorOptions points []{{$v.Name}}Point keepTags bool } func new{{$k.Name}}Reduce{{$v.Name}}Iterator(input {{$k.Name}}Iterator, opt IteratorOptions, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)) *{{$k.name}}Reduce{{$v.Name}}Iterator { return &{{$k.name}}Reduce{{$v.Name}}Iterator{ input: newBuf{{$k.Name}}Iterator(input), create: createFn, dims: opt.GetDimensions(), opt: opt, } } // Stats returns stats from the input iterator. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() } // Next returns the minimum value for the next available interval. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) { // Calculate next window if we have no more points. if len(itr.points) == 0 { var err error itr.points, err = itr.reduce() if len(itr.points) == 0 { return nil, err } } // Pop next point off the stack. p := &itr.points[len(itr.points)-1] itr.points = itr.points[:len(itr.points)-1] return p, nil } // {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination. type {{$k.name}}Reduce{{$v.Name}}Point struct { Name string Tags Tags Aggregator {{$k.Name}}PointAggregator Emitter {{$v.Name}}PointEmitter } // reduce executes fn once for every point in the next window. // The previous value for the dimension is passed to fn. func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) { // Calculate next window. var ( startTime, endTime int64 window struct { name string tags string } ) for { p, err := itr.input.Next() if err != nil || p == nil { return nil, err } else if p.Nil { continue } // Unread the point so it can be processed. itr.input.unread(p) startTime, endTime = itr.opt.Window(p.Time) window.name, window.tags = p.Name, p.Tags.Subset(itr.opt.Dimensions).ID() break } // Create points by tags. m := make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point) for { // Read next point. curr, err := itr.input.NextInWindow(startTime, endTime) if err != nil { return nil, err } else if curr == nil { break } else if curr.Nil { continue } else if curr.Name != window.name { itr.input.unread(curr) break } // Ensure this point is within the same final window. if curr.Name != window.name { itr.input.unread(curr) break } else if tags := curr.Tags.Subset(itr.opt.Dimensions); tags.ID() != window.tags { itr.input.unread(curr) break } // Retrieve the tags on this point for this level of the query. // This may be different than the bucket dimensions. tags := curr.Tags.Subset(itr.dims) id := tags.ID() // Retrieve the aggregator for this name/tag combination or create one. rp := m[id] if rp == nil { aggregator, emitter := itr.create() rp = &{{$k.name}}Reduce{{$v.Name}}Point{ Name: curr.Name, Tags: tags, Aggregator: aggregator, Emitter: emitter, } m[id] = rp } rp.Aggregator.Aggregate{{$k.Name}}(curr) } keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } // Reverse sort points by name & tag. // This ensures a consistent order of output. if len(keys) > 0 { var sorted sort.Interface = sort.StringSlice(keys) if itr.opt.Ascending { sorted = sort.Reverse(sorted) } sort.Sort(sorted) } // Assume the points are already sorted until proven otherwise. sortedByTime := true // Emit the points for each name & tag combination. a := make([]{{$v.Name}}Point, 0, len(m)) for _, k := range keys { rp := m[k] points := rp.Emitter.Emit() for i := len(points)-1; i >= 0; i-- { points[i].Name = rp.Name if !itr.keepTags { points[i].Tags = rp.Tags } // Set the points time to the interval time if the reducer didn't provide one. if points[i].Time == ZeroTime { points[i].Time = startTime } else { sortedByTime = false } a = append(a, points[i]) } } // Points may be out of order. Perform a stable sort by time if requested. if !sortedByTime && itr.opt.Ordered { var sorted sort.Interface = {{$v.name}}PointsByTime(a) if itr.opt.Ascending { sorted = sort.Reverse(sorted) } sort.Stable(sorted) } return a, nil } // {{$k.name}}Stream{{$v.Name}}Iterator streams inputs into the iterator and emits points gradually. type {{$k.name}}Stream{{$v.Name}}Iterator struct { input *buf{{$k.Name}}Iterator create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) dims []string opt IteratorOptions m map[string]*{{$k.name}}Reduce{{$v.Name}}Point points []{{$v.Name}}Point } // new{{$k.Name}}Stream{{$v.Name}}Iterator returns a new instance of {{$k.name}}Stream{{$v.Name}}Iterator. func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator { return &{{$k.name}}Stream{{$v.Name}}Iterator{ input: newBuf{{$k.Name}}Iterator(input), create: createFn, dims: opt.GetDimensions(), opt: opt, m: make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point), } } // Stats returns stats from the input iterator. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Close() error { return itr.input.Close() } // Next returns the next value for the stream iterator. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) { // Calculate next window if we have no more points. if len(itr.points) == 0 { var err error itr.points, err = itr.reduce() if len(itr.points) == 0 { return nil, err } } // Pop next point off the stack. p := &itr.points[len(itr.points)-1] itr.points = itr.points[:len(itr.points)-1] return p, nil } // reduce creates and manages aggregators for every point from the input. // After aggregating a point, it always tries to emit a value using the emitter. func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) { // We have already read all of the input points. if itr.m == nil { return nil, nil } for { // Read next point. curr, err := itr.input.Next() if err != nil { return nil, err } else if curr == nil { // Close all of the aggregators to flush any remaining points to emit. var points []{{$v.Name}}Point for _, rp := range itr.m { if aggregator, ok := rp.Aggregator.(io.Closer); ok { if err := aggregator.Close(); err != nil { return nil, err } pts := rp.Emitter.Emit() if len(pts) == 0 { continue } for i := range pts { pts[i].Name = rp.Name pts[i].Tags = rp.Tags } points = append(points, pts...) } } // Eliminate the aggregators and emitters. itr.m = nil return points, nil } else if curr.Nil { continue } tags := curr.Tags.Subset(itr.dims) id := curr.Name if len(tags.m) > 0 { id += "\x00" + tags.ID() } // Retrieve the aggregator for this name/tag combination or create one. rp := itr.m[id] if rp == nil { aggregator, emitter := itr.create() rp = &{{$k.name}}Reduce{{.Name}}Point{ Name: curr.Name, Tags: tags, Aggregator: aggregator, Emitter: emitter, } itr.m[id] = rp } rp.Aggregator.Aggregate{{$k.Name}}(curr) // Attempt to emit points from the aggregator. points := rp.Emitter.Emit() if len(points) == 0 { continue } for i := range points { points[i].Name = rp.Name points[i].Tags = rp.Tags } return points, nil } } {{end}} // {{$k.name}}DedupeIterator only outputs unique points. // This differs from the DistinctIterator in that it compares all aux fields too. // This iterator is relatively inefficient and should only be used on small // datasets such as meta query results. type {{$k.name}}DedupeIterator struct { input {{$k.Name}}Iterator m map[string]struct{} // lookup of points already sent } type {{$k.name}}IteratorMapper struct { cur Cursor row Row driver IteratorMap // which iterator to use for the primary value, can be nil fields []IteratorMap // which iterator to use for an aux field point {{$k.Name}}Point } func new{{$k.Name}}IteratorMapper(cur Cursor, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper { return &{{$k.name}}IteratorMapper{ cur: cur, driver: driver, fields: fields, point: {{$k.Name}}Point{ Aux: make([]interface{}, len(fields)), }, } } func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) { if !itr.cur.Scan(&itr.row) { if err := itr.cur.Err(); err != nil { return nil, err } return nil, nil } itr.point.Time = itr.row.Time itr.point.Name = itr.row.Series.Name itr.point.Tags = itr.row.Series.Tags if itr.driver != nil { if v := itr.driver.Value(&itr.row); v != nil { if v, ok := castTo{{$k.Name}}(v); ok { itr.point.Value = v itr.point.Nil = false } else { itr.point.Value = {{$k.Nil}} itr.point.Nil = true } } else { itr.point.Value = {{$k.Nil}} itr.point.Nil = true } } for i, f := range itr.fields { itr.point.Aux[i] = f.Value(&itr.row) } return &itr.point, nil } func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats { return itr.cur.Stats() } func (itr *{{$k.name}}IteratorMapper) Close() error { return itr.cur.Close() } type {{$k.name}}FilterIterator struct { input {{$k.Name}}Iterator cond influxql.Expr opt IteratorOptions m map[string]interface{} } func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond influxql.Expr, opt IteratorOptions) {{$k.Name}}Iterator { // Strip out time conditions from the WHERE clause. // TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct. n := influxql.RewriteFunc(influxql.CloneExpr(cond), func(n influxql.Node) influxql.Node { switch n := n.(type) { case *influxql.BinaryExpr: if n.LHS.String() == "time" { return &influxql.BooleanLiteral{Val: true} } } return n }) cond, _ = n.(influxql.Expr) if cond == nil { return input } else if n, ok := cond.(*influxql.BooleanLiteral); ok && n.Val { return input } return &{{$k.name}}FilterIterator{ input: input, cond: cond, opt: opt, m: make(map[string]interface{}), } } func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) { for { p, err := itr.input.Next() if err != nil || p == nil { return nil, err } for i, ref := range itr.opt.Aux { itr.m[ref.Val] = p.Aux[i] } for k, v := range p.Tags.KeyValues() { itr.m[k] = v } if !influxql.EvalBool(itr.cond, itr.m) { continue } return p, nil } } type {{$k.name}}TagSubsetIterator struct { input {{$k.Name}}Iterator point {{$k.Name}}Point lastTags Tags dimensions []string } func new{{$k.Name}}TagSubsetIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}TagSubsetIterator { return &{{$k.name}}TagSubsetIterator{ input: input, dimensions: opt.GetDimensions(), } } func (itr *{{$k.name}}TagSubsetIterator) Next() (*{{$k.Name}}Point, error) { p, err := itr.input.Next() if err != nil { return nil, err } else if p == nil { return nil, nil } itr.point.Name = p.Name if !p.Tags.Equal(itr.lastTags) { itr.point.Tags = p.Tags.Subset(itr.dimensions) itr.lastTags = p.Tags } itr.point.Time = p.Time itr.point.Value = p.Value itr.point.Aux = p.Aux itr.point.Aggregated = p.Aggregated itr.point.Nil = p.Nil return &itr.point, nil } func (itr *{{$k.name}}TagSubsetIterator) Stats() IteratorStats { return itr.input.Stats() } func (itr *{{$k.name}}TagSubsetIterator) Close() error { return itr.input.Close() } // new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator. func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator { return &{{$k.name}}DedupeIterator{ input: input, m: make(map[string]struct{}), } } // Stats returns stats from the input iterator. func (itr *{{$k.name}}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() } // Close closes the iterator and all child iterators. func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() } // Next returns the next unique point from the input iterator. func (itr *{{$k.name}}DedupeIterator) Next() (*{{$k.Name}}Point, error) { for { // Read next point. p, err := itr.input.Next() if p == nil || err != nil { return nil, err } // Serialize to bytes to store in lookup. buf, err := proto.Marshal(encode{{$k.Name}}Point(p)) if err != nil { return nil, err } // If the point has already been output then move to the next point. if _, ok := itr.m[string(buf)]; ok { continue } // Otherwise mark it as emitted and return point. itr.m[string(buf)] = struct{}{} return p, nil } } // {{$k.name}}ReaderIterator represents an iterator that streams from a reader. type {{$k.name}}ReaderIterator struct { r io.Reader dec *{{$k.Name}}PointDecoder } // new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator. func new{{$k.Name}}ReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator { dec := New{{$k.Name}}PointDecoder(ctx, r) dec.stats = stats return &{{$k.name}}ReaderIterator{ r: r, dec: dec, } } // Stats returns stats about points processed. func (itr *{{$k.name}}ReaderIterator) Stats() IteratorStats { return itr.dec.stats } // Close closes the underlying reader, if applicable. func (itr *{{$k.name}}ReaderIterator) Close() error { if r, ok := itr.r.(io.ReadCloser); ok { return r.Close() } return nil } // Next returns the next point from the iterator. func (itr *{{$k.name}}ReaderIterator) Next() (*{{$k.Name}}Point, error) { // OPTIMIZE(benbjohnson): Reuse point on iterator. // Unmarshal next point. p := &{{$k.Name}}Point{} if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF { return nil, nil } else if err != nil { return nil, err } return p, nil } {{end}} {{range .}} // encode{{.Name}}Iterator encodes all points from itr to the underlying writer. func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error { ticker := time.NewTicker(enc.StatsInterval) defer ticker.Stop() // Emit initial stats. if err := enc.encodeStats(itr.Stats()); err != nil { return err } // Continually stream points from the iterator into the encoder. penc := New{{.Name}}PointEncoder(enc.w) for { // Emit stats periodically. select { case <-ticker.C: if err := enc.encodeStats(itr.Stats()); err != nil { return err } default: } // Retrieve the next point from the iterator. p, err := itr.Next() if err != nil { return err } else if p == nil { break } // Write the point to the point encoder. if err := penc.Encode{{.Name}}Point(p); err != nil { return err } } // Emit final stats. if err := enc.encodeStats(itr.Stats()); err != nil { return err } return nil } {{end}} {{end}}