// Generated by tmpl // https://github.com/benbjohnson/tmpl // // DO NOT EDIT! // Source: iterator.gen.go.tmpl package tsm1 import ( "fmt" "runtime" "sort" "sync" "github.com/influxdata/influxdb/pkg/metrics" "github.com/influxdata/influxdb/pkg/tracing" "github.com/influxdata/influxdb/pkg/tracing/fields" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxql" "github.com/influxdata/platform/tsdb" "go.uber.org/zap" ) type cursor interface { close() error next() (t int64, v interface{}) } // cursorAt provides a bufferred cursor interface. // This required for literal value cursors which don't have a time value. type cursorAt interface { close() error peek() (k int64, v interface{}) nextAt(seek int64) interface{} } type nilCursor struct{} func (nilCursor) next() (int64, interface{}) { return tsdb.EOF, nil } func (nilCursor) close() error { return nil } // bufCursor implements a bufferred cursor. type bufCursor struct { cur cursor buf struct { key int64 value interface{} filled bool } ascending bool } // newBufCursor returns a bufferred wrapper for cur. func newBufCursor(cur cursor, ascending bool) *bufCursor { return &bufCursor{cur: cur, ascending: ascending} } func (c *bufCursor) close() error { if c.cur == nil { return nil } err := c.cur.close() c.cur = nil return err } // next returns the buffer, if filled. Otherwise returns the next key/value from the cursor. func (c *bufCursor) next() (int64, interface{}) { if c.buf.filled { k, v := c.buf.key, c.buf.value c.buf.filled = false return k, v } return c.cur.next() } // unread pushes k and v onto the buffer. func (c *bufCursor) unread(k int64, v interface{}) { c.buf.key, c.buf.value = k, v c.buf.filled = true } // peek reads next next key/value without removing them from the cursor. func (c *bufCursor) peek() (k int64, v interface{}) { k, v = c.next() c.unread(k, v) return } // nextAt returns the next value where key is equal to seek. // Skips over any keys that are less than seek. // If the key doesn't exist then a nil value is returned instead. func (c *bufCursor) nextAt(seek int64) interface{} { for { k, v := c.next() if k != tsdb.EOF { if k == seek { return v } else if c.ascending && k < seek { continue } else if !c.ascending && k > seek { continue } c.unread(k, v) } // Return "nil" value for type. switch c.cur.(type) { case floatCursor: return (*float64)(nil) case integerCursor: return (*int64)(nil) case unsignedCursor: return (*uint64)(nil) case stringCursor: return (*string)(nil) case booleanCursor: return (*bool)(nil) default: panic("unreachable") } } } // statsBufferCopyIntervalN is the number of points that are read before // copying the stats buffer to the iterator's stats field. This is used to // amortize the cost of using a mutex when updating stats. const statsBufferCopyIntervalN = 100 type floatFinalizerIterator struct { query.FloatIterator logger *zap.Logger } func newFloatFinalizerIterator(inner query.FloatIterator, logger *zap.Logger) *floatFinalizerIterator { itr := &floatFinalizerIterator{FloatIterator: inner, logger: logger} runtime.SetFinalizer(itr, (*floatFinalizerIterator).closeGC) return itr } func (itr *floatFinalizerIterator) closeGC() { go func() { itr.logger.Error("FloatIterator finalized by GC") itr.Close() }() } func (itr *floatFinalizerIterator) Close() error { runtime.SetFinalizer(itr, nil) return itr.FloatIterator.Close() } type floatInstrumentedIterator struct { query.FloatIterator span *tracing.Span group *metrics.Group } func newFloatInstrumentedIterator(inner query.FloatIterator, span *tracing.Span, group *metrics.Group) *floatInstrumentedIterator { return &floatInstrumentedIterator{FloatIterator: inner, span: span, group: group} } func (itr *floatInstrumentedIterator) Close() error { var f fields.Fields itr.group.ForEach(func(v metrics.Metric) { switch m := v.(type) { case *metrics.Counter: f = append(f, fields.Int64(m.Name(), m.Value())) case *metrics.Timer: f = append(f, fields.Duration(m.Name(), m.Value())) default: panic("unexpected metrics") } }) itr.span.SetFields(f) itr.span.Finish() return itr.FloatIterator.Close() } type floatIterator struct { cur floatCursor aux []cursorAt conds struct { names []string curs []cursorAt } opt query.IteratorOptions m map[string]interface{} // map used for condition evaluation point query.FloatPoint // reusable buffer statsLock sync.Mutex stats query.IteratorStats statsBuf query.IteratorStats } func newFloatIterator(name string, tags query.Tags, opt query.IteratorOptions, cur floatCursor, aux []cursorAt, conds []cursorAt, condNames []string) *floatIterator { itr := &floatIterator{ cur: cur, aux: aux, opt: opt, point: query.FloatPoint{ Name: name, Tags: tags, }, statsBuf: query.IteratorStats{ SeriesN: 1, }, } itr.stats = itr.statsBuf if len(aux) > 0 { itr.point.Aux = make([]interface{}, len(aux)) } if opt.Condition != nil { itr.m = make(map[string]interface{}, len(aux)+len(conds)) } itr.conds.names = condNames itr.conds.curs = conds return itr } // Next returns the next point from the iterator. func (itr *floatIterator) Next() (*query.FloatPoint, error) { for { seek := tsdb.EOF if itr.cur != nil { // Read from the main cursor if we have one. itr.point.Time, itr.point.Value = itr.cur.nextFloat() seek = itr.point.Time } else { // Otherwise find lowest aux timestamp. for i := range itr.aux { if k, _ := itr.aux[i].peek(); k != tsdb.EOF { if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { seek = k } } } itr.point.Time = seek } // Exit if we have no more points or we are outside our time range. if itr.point.Time == tsdb.EOF { itr.copyStats() return nil, nil } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { itr.copyStats() return nil, nil } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { itr.copyStats() return nil, nil } // Read from each auxiliary cursor. for i := range itr.opt.Aux { itr.point.Aux[i] = itr.aux[i].nextAt(seek) } // Read from condition field cursors. for i := range itr.conds.curs { itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) } // Evaluate condition, if one exists. Retry if it fails. valuer := influxql.ValuerEval{ Valuer: influxql.MultiValuer( query.MathValuer{}, influxql.MapValuer(itr.m), ), } if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { continue } // Track points returned. itr.statsBuf.PointN++ // Copy buffer to stats periodically. if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { itr.copyStats() } return &itr.point, nil } } // copyStats copies from the itr stats buffer to the stats under lock. func (itr *floatIterator) copyStats() { itr.statsLock.Lock() itr.stats = itr.statsBuf itr.statsLock.Unlock() } // Stats returns stats on the points processed. func (itr *floatIterator) Stats() query.IteratorStats { itr.statsLock.Lock() stats := itr.stats itr.statsLock.Unlock() return stats } // Close closes the iterator. func (itr *floatIterator) Close() error { cursorsAt(itr.aux).close() itr.aux = nil cursorsAt(itr.conds.curs).close() itr.conds.curs = nil if itr.cur != nil { err := itr.cur.close() itr.cur = nil return err } return nil } // floatLimitIterator type floatLimitIterator struct { input query.FloatIterator opt query.IteratorOptions n int } func newFloatLimitIterator(input query.FloatIterator, opt query.IteratorOptions) *floatLimitIterator { return &floatLimitIterator{ input: input, opt: opt, } } func (itr *floatLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } func (itr *floatLimitIterator) Close() error { return itr.input.Close() } func (itr *floatLimitIterator) Next() (*query.FloatPoint, error) { // Check if we are beyond the limit. if (itr.n - itr.opt.Offset) > itr.opt.Limit { return nil, nil } // Read the next point. p, err := itr.input.Next() if p == nil || err != nil { return nil, err } // Increment counter. itr.n++ // Offsets are handled by a higher level iterator so return all points. return p, nil } // floatCursor represents an object for iterating over a single float field. type floatCursor interface { cursor nextFloat() (t int64, v float64) } func newFloatCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) floatCursor { if ascending { return newFloatAscendingCursor(seek, cacheValues, tsmKeyCursor) } return newFloatDescendingCursor(seek, cacheValues, tsmKeyCursor) } type floatAscendingCursor struct { cache struct { values Values pos int } tsm struct { values []FloatValue pos int keyCursor *KeyCursor } } func newFloatAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *floatAscendingCursor { c := &floatAscendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) return c } // peekCache returns the current time/value from the cache. func (c *floatAscendingCursor) peekCache() (t int64, v float64) { if c.cache.pos >= len(c.cache.values) { return tsdb.EOF, 0 } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(FloatValue).value } // peekTSM returns the current time/value from tsm. func (c *floatAscendingCursor) peekTSM() (t int64, v float64) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, 0 } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *floatAscendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *floatAscendingCursor) next() (int64, interface{}) { return c.nextFloat() } // nextFloat returns the next key/value for the cursor. func (c *floatAscendingCursor) nextFloat() (int64, float64) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, 0 } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *floatAscendingCursor) nextCache() { if c.cache.pos >= len(c.cache.values) { return } c.cache.pos++ } // nextTSM returns the next value from the TSM files. func (c *floatAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = 0 } } type floatDescendingCursor struct { cache struct { values Values pos int } tsm struct { values []FloatValue pos int keyCursor *KeyCursor } } func newFloatDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *floatDescendingCursor { c := &floatDescendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) if t, _ := c.peekCache(); t != seek { c.cache.pos-- } c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) if t, _ := c.peekTSM(); t != seek { c.tsm.pos-- } return c } // peekCache returns the current time/value from the cache. func (c *floatDescendingCursor) peekCache() (t int64, v float64) { if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { return tsdb.EOF, 0 } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(FloatValue).value } // peekTSM returns the current time/value from tsm. func (c *floatDescendingCursor) peekTSM() (t int64, v float64) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, 0 } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *floatDescendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *floatDescendingCursor) next() (int64, interface{}) { return c.nextFloat() } // nextFloat returns the next key/value for the cursor. func (c *floatDescendingCursor) nextFloat() (int64, float64) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, 0 } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *floatDescendingCursor) nextCache() { if c.cache.pos < 0 { return } c.cache.pos-- } // nextTSM returns the next value from the TSM files. func (c *floatDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = len(c.tsm.values) - 1 } } type integerFinalizerIterator struct { query.IntegerIterator logger *zap.Logger } func newIntegerFinalizerIterator(inner query.IntegerIterator, logger *zap.Logger) *integerFinalizerIterator { itr := &integerFinalizerIterator{IntegerIterator: inner, logger: logger} runtime.SetFinalizer(itr, (*integerFinalizerIterator).closeGC) return itr } func (itr *integerFinalizerIterator) closeGC() { go func() { itr.logger.Error("IntegerIterator finalized by GC") itr.Close() }() } func (itr *integerFinalizerIterator) Close() error { runtime.SetFinalizer(itr, nil) return itr.IntegerIterator.Close() } type integerInstrumentedIterator struct { query.IntegerIterator span *tracing.Span group *metrics.Group } func newIntegerInstrumentedIterator(inner query.IntegerIterator, span *tracing.Span, group *metrics.Group) *integerInstrumentedIterator { return &integerInstrumentedIterator{IntegerIterator: inner, span: span, group: group} } func (itr *integerInstrumentedIterator) Close() error { var f fields.Fields itr.group.ForEach(func(v metrics.Metric) { switch m := v.(type) { case *metrics.Counter: f = append(f, fields.Int64(m.Name(), m.Value())) case *metrics.Timer: f = append(f, fields.Duration(m.Name(), m.Value())) default: panic("unexpected metrics") } }) itr.span.SetFields(f) itr.span.Finish() return itr.IntegerIterator.Close() } type integerIterator struct { cur integerCursor aux []cursorAt conds struct { names []string curs []cursorAt } opt query.IteratorOptions m map[string]interface{} // map used for condition evaluation point query.IntegerPoint // reusable buffer statsLock sync.Mutex stats query.IteratorStats statsBuf query.IteratorStats } func newIntegerIterator(name string, tags query.Tags, opt query.IteratorOptions, cur integerCursor, aux []cursorAt, conds []cursorAt, condNames []string) *integerIterator { itr := &integerIterator{ cur: cur, aux: aux, opt: opt, point: query.IntegerPoint{ Name: name, Tags: tags, }, statsBuf: query.IteratorStats{ SeriesN: 1, }, } itr.stats = itr.statsBuf if len(aux) > 0 { itr.point.Aux = make([]interface{}, len(aux)) } if opt.Condition != nil { itr.m = make(map[string]interface{}, len(aux)+len(conds)) } itr.conds.names = condNames itr.conds.curs = conds return itr } // Next returns the next point from the iterator. func (itr *integerIterator) Next() (*query.IntegerPoint, error) { for { seek := tsdb.EOF if itr.cur != nil { // Read from the main cursor if we have one. itr.point.Time, itr.point.Value = itr.cur.nextInteger() seek = itr.point.Time } else { // Otherwise find lowest aux timestamp. for i := range itr.aux { if k, _ := itr.aux[i].peek(); k != tsdb.EOF { if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { seek = k } } } itr.point.Time = seek } // Exit if we have no more points or we are outside our time range. if itr.point.Time == tsdb.EOF { itr.copyStats() return nil, nil } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { itr.copyStats() return nil, nil } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { itr.copyStats() return nil, nil } // Read from each auxiliary cursor. for i := range itr.opt.Aux { itr.point.Aux[i] = itr.aux[i].nextAt(seek) } // Read from condition field cursors. for i := range itr.conds.curs { itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) } // Evaluate condition, if one exists. Retry if it fails. valuer := influxql.ValuerEval{ Valuer: influxql.MultiValuer( query.MathValuer{}, influxql.MapValuer(itr.m), ), } if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { continue } // Track points returned. itr.statsBuf.PointN++ // Copy buffer to stats periodically. if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { itr.copyStats() } return &itr.point, nil } } // copyStats copies from the itr stats buffer to the stats under lock. func (itr *integerIterator) copyStats() { itr.statsLock.Lock() itr.stats = itr.statsBuf itr.statsLock.Unlock() } // Stats returns stats on the points processed. func (itr *integerIterator) Stats() query.IteratorStats { itr.statsLock.Lock() stats := itr.stats itr.statsLock.Unlock() return stats } // Close closes the iterator. func (itr *integerIterator) Close() error { cursorsAt(itr.aux).close() itr.aux = nil cursorsAt(itr.conds.curs).close() itr.conds.curs = nil if itr.cur != nil { err := itr.cur.close() itr.cur = nil return err } return nil } // integerLimitIterator type integerLimitIterator struct { input query.IntegerIterator opt query.IteratorOptions n int } func newIntegerLimitIterator(input query.IntegerIterator, opt query.IteratorOptions) *integerLimitIterator { return &integerLimitIterator{ input: input, opt: opt, } } func (itr *integerLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } func (itr *integerLimitIterator) Close() error { return itr.input.Close() } func (itr *integerLimitIterator) Next() (*query.IntegerPoint, error) { // Check if we are beyond the limit. if (itr.n - itr.opt.Offset) > itr.opt.Limit { return nil, nil } // Read the next point. p, err := itr.input.Next() if p == nil || err != nil { return nil, err } // Increment counter. itr.n++ // Offsets are handled by a higher level iterator so return all points. return p, nil } // integerCursor represents an object for iterating over a single integer field. type integerCursor interface { cursor nextInteger() (t int64, v int64) } func newIntegerCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) integerCursor { if ascending { return newIntegerAscendingCursor(seek, cacheValues, tsmKeyCursor) } return newIntegerDescendingCursor(seek, cacheValues, tsmKeyCursor) } type integerAscendingCursor struct { cache struct { values Values pos int } tsm struct { values []IntegerValue pos int keyCursor *KeyCursor } } func newIntegerAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *integerAscendingCursor { c := &integerAscendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) return c } // peekCache returns the current time/value from the cache. func (c *integerAscendingCursor) peekCache() (t int64, v int64) { if c.cache.pos >= len(c.cache.values) { return tsdb.EOF, 0 } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(IntegerValue).value } // peekTSM returns the current time/value from tsm. func (c *integerAscendingCursor) peekTSM() (t int64, v int64) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, 0 } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *integerAscendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *integerAscendingCursor) next() (int64, interface{}) { return c.nextInteger() } // nextInteger returns the next key/value for the cursor. func (c *integerAscendingCursor) nextInteger() (int64, int64) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, 0 } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *integerAscendingCursor) nextCache() { if c.cache.pos >= len(c.cache.values) { return } c.cache.pos++ } // nextTSM returns the next value from the TSM files. func (c *integerAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = 0 } } type integerDescendingCursor struct { cache struct { values Values pos int } tsm struct { values []IntegerValue pos int keyCursor *KeyCursor } } func newIntegerDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *integerDescendingCursor { c := &integerDescendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) if t, _ := c.peekCache(); t != seek { c.cache.pos-- } c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) if t, _ := c.peekTSM(); t != seek { c.tsm.pos-- } return c } // peekCache returns the current time/value from the cache. func (c *integerDescendingCursor) peekCache() (t int64, v int64) { if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { return tsdb.EOF, 0 } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(IntegerValue).value } // peekTSM returns the current time/value from tsm. func (c *integerDescendingCursor) peekTSM() (t int64, v int64) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, 0 } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *integerDescendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *integerDescendingCursor) next() (int64, interface{}) { return c.nextInteger() } // nextInteger returns the next key/value for the cursor. func (c *integerDescendingCursor) nextInteger() (int64, int64) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, 0 } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *integerDescendingCursor) nextCache() { if c.cache.pos < 0 { return } c.cache.pos-- } // nextTSM returns the next value from the TSM files. func (c *integerDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = len(c.tsm.values) - 1 } } type unsignedFinalizerIterator struct { query.UnsignedIterator logger *zap.Logger } func newUnsignedFinalizerIterator(inner query.UnsignedIterator, logger *zap.Logger) *unsignedFinalizerIterator { itr := &unsignedFinalizerIterator{UnsignedIterator: inner, logger: logger} runtime.SetFinalizer(itr, (*unsignedFinalizerIterator).closeGC) return itr } func (itr *unsignedFinalizerIterator) closeGC() { go func() { itr.logger.Error("UnsignedIterator finalized by GC") itr.Close() }() } func (itr *unsignedFinalizerIterator) Close() error { runtime.SetFinalizer(itr, nil) return itr.UnsignedIterator.Close() } type unsignedInstrumentedIterator struct { query.UnsignedIterator span *tracing.Span group *metrics.Group } func newUnsignedInstrumentedIterator(inner query.UnsignedIterator, span *tracing.Span, group *metrics.Group) *unsignedInstrumentedIterator { return &unsignedInstrumentedIterator{UnsignedIterator: inner, span: span, group: group} } func (itr *unsignedInstrumentedIterator) Close() error { var f fields.Fields itr.group.ForEach(func(v metrics.Metric) { switch m := v.(type) { case *metrics.Counter: f = append(f, fields.Int64(m.Name(), m.Value())) case *metrics.Timer: f = append(f, fields.Duration(m.Name(), m.Value())) default: panic("unexpected metrics") } }) itr.span.SetFields(f) itr.span.Finish() return itr.UnsignedIterator.Close() } type unsignedIterator struct { cur unsignedCursor aux []cursorAt conds struct { names []string curs []cursorAt } opt query.IteratorOptions m map[string]interface{} // map used for condition evaluation point query.UnsignedPoint // reusable buffer statsLock sync.Mutex stats query.IteratorStats statsBuf query.IteratorStats } func newUnsignedIterator(name string, tags query.Tags, opt query.IteratorOptions, cur unsignedCursor, aux []cursorAt, conds []cursorAt, condNames []string) *unsignedIterator { itr := &unsignedIterator{ cur: cur, aux: aux, opt: opt, point: query.UnsignedPoint{ Name: name, Tags: tags, }, statsBuf: query.IteratorStats{ SeriesN: 1, }, } itr.stats = itr.statsBuf if len(aux) > 0 { itr.point.Aux = make([]interface{}, len(aux)) } if opt.Condition != nil { itr.m = make(map[string]interface{}, len(aux)+len(conds)) } itr.conds.names = condNames itr.conds.curs = conds return itr } // Next returns the next point from the iterator. func (itr *unsignedIterator) Next() (*query.UnsignedPoint, error) { for { seek := tsdb.EOF if itr.cur != nil { // Read from the main cursor if we have one. itr.point.Time, itr.point.Value = itr.cur.nextUnsigned() seek = itr.point.Time } else { // Otherwise find lowest aux timestamp. for i := range itr.aux { if k, _ := itr.aux[i].peek(); k != tsdb.EOF { if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { seek = k } } } itr.point.Time = seek } // Exit if we have no more points or we are outside our time range. if itr.point.Time == tsdb.EOF { itr.copyStats() return nil, nil } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { itr.copyStats() return nil, nil } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { itr.copyStats() return nil, nil } // Read from each auxiliary cursor. for i := range itr.opt.Aux { itr.point.Aux[i] = itr.aux[i].nextAt(seek) } // Read from condition field cursors. for i := range itr.conds.curs { itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) } // Evaluate condition, if one exists. Retry if it fails. valuer := influxql.ValuerEval{ Valuer: influxql.MultiValuer( query.MathValuer{}, influxql.MapValuer(itr.m), ), } if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { continue } // Track points returned. itr.statsBuf.PointN++ // Copy buffer to stats periodically. if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { itr.copyStats() } return &itr.point, nil } } // copyStats copies from the itr stats buffer to the stats under lock. func (itr *unsignedIterator) copyStats() { itr.statsLock.Lock() itr.stats = itr.statsBuf itr.statsLock.Unlock() } // Stats returns stats on the points processed. func (itr *unsignedIterator) Stats() query.IteratorStats { itr.statsLock.Lock() stats := itr.stats itr.statsLock.Unlock() return stats } // Close closes the iterator. func (itr *unsignedIterator) Close() error { cursorsAt(itr.aux).close() itr.aux = nil cursorsAt(itr.conds.curs).close() itr.conds.curs = nil if itr.cur != nil { err := itr.cur.close() itr.cur = nil return err } return nil } // unsignedLimitIterator type unsignedLimitIterator struct { input query.UnsignedIterator opt query.IteratorOptions n int } func newUnsignedLimitIterator(input query.UnsignedIterator, opt query.IteratorOptions) *unsignedLimitIterator { return &unsignedLimitIterator{ input: input, opt: opt, } } func (itr *unsignedLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } func (itr *unsignedLimitIterator) Close() error { return itr.input.Close() } func (itr *unsignedLimitIterator) Next() (*query.UnsignedPoint, error) { // Check if we are beyond the limit. if (itr.n - itr.opt.Offset) > itr.opt.Limit { return nil, nil } // Read the next point. p, err := itr.input.Next() if p == nil || err != nil { return nil, err } // Increment counter. itr.n++ // Offsets are handled by a higher level iterator so return all points. return p, nil } // unsignedCursor represents an object for iterating over a single unsigned field. type unsignedCursor interface { cursor nextUnsigned() (t int64, v uint64) } func newUnsignedCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) unsignedCursor { if ascending { return newUnsignedAscendingCursor(seek, cacheValues, tsmKeyCursor) } return newUnsignedDescendingCursor(seek, cacheValues, tsmKeyCursor) } type unsignedAscendingCursor struct { cache struct { values Values pos int } tsm struct { values []UnsignedValue pos int keyCursor *KeyCursor } } func newUnsignedAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *unsignedAscendingCursor { c := &unsignedAscendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) return c } // peekCache returns the current time/value from the cache. func (c *unsignedAscendingCursor) peekCache() (t int64, v uint64) { if c.cache.pos >= len(c.cache.values) { return tsdb.EOF, 0 } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(UnsignedValue).value } // peekTSM returns the current time/value from tsm. func (c *unsignedAscendingCursor) peekTSM() (t int64, v uint64) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, 0 } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *unsignedAscendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *unsignedAscendingCursor) next() (int64, interface{}) { return c.nextUnsigned() } // nextUnsigned returns the next key/value for the cursor. func (c *unsignedAscendingCursor) nextUnsigned() (int64, uint64) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, 0 } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *unsignedAscendingCursor) nextCache() { if c.cache.pos >= len(c.cache.values) { return } c.cache.pos++ } // nextTSM returns the next value from the TSM files. func (c *unsignedAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = 0 } } type unsignedDescendingCursor struct { cache struct { values Values pos int } tsm struct { values []UnsignedValue pos int keyCursor *KeyCursor } } func newUnsignedDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *unsignedDescendingCursor { c := &unsignedDescendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) if t, _ := c.peekCache(); t != seek { c.cache.pos-- } c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) if t, _ := c.peekTSM(); t != seek { c.tsm.pos-- } return c } // peekCache returns the current time/value from the cache. func (c *unsignedDescendingCursor) peekCache() (t int64, v uint64) { if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { return tsdb.EOF, 0 } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(UnsignedValue).value } // peekTSM returns the current time/value from tsm. func (c *unsignedDescendingCursor) peekTSM() (t int64, v uint64) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, 0 } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *unsignedDescendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *unsignedDescendingCursor) next() (int64, interface{}) { return c.nextUnsigned() } // nextUnsigned returns the next key/value for the cursor. func (c *unsignedDescendingCursor) nextUnsigned() (int64, uint64) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, 0 } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *unsignedDescendingCursor) nextCache() { if c.cache.pos < 0 { return } c.cache.pos-- } // nextTSM returns the next value from the TSM files. func (c *unsignedDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = len(c.tsm.values) - 1 } } type stringFinalizerIterator struct { query.StringIterator logger *zap.Logger } func newStringFinalizerIterator(inner query.StringIterator, logger *zap.Logger) *stringFinalizerIterator { itr := &stringFinalizerIterator{StringIterator: inner, logger: logger} runtime.SetFinalizer(itr, (*stringFinalizerIterator).closeGC) return itr } func (itr *stringFinalizerIterator) closeGC() { go func() { itr.logger.Error("StringIterator finalized by GC") itr.Close() }() } func (itr *stringFinalizerIterator) Close() error { runtime.SetFinalizer(itr, nil) return itr.StringIterator.Close() } type stringInstrumentedIterator struct { query.StringIterator span *tracing.Span group *metrics.Group } func newStringInstrumentedIterator(inner query.StringIterator, span *tracing.Span, group *metrics.Group) *stringInstrumentedIterator { return &stringInstrumentedIterator{StringIterator: inner, span: span, group: group} } func (itr *stringInstrumentedIterator) Close() error { var f fields.Fields itr.group.ForEach(func(v metrics.Metric) { switch m := v.(type) { case *metrics.Counter: f = append(f, fields.Int64(m.Name(), m.Value())) case *metrics.Timer: f = append(f, fields.Duration(m.Name(), m.Value())) default: panic("unexpected metrics") } }) itr.span.SetFields(f) itr.span.Finish() return itr.StringIterator.Close() } type stringIterator struct { cur stringCursor aux []cursorAt conds struct { names []string curs []cursorAt } opt query.IteratorOptions m map[string]interface{} // map used for condition evaluation point query.StringPoint // reusable buffer statsLock sync.Mutex stats query.IteratorStats statsBuf query.IteratorStats } func newStringIterator(name string, tags query.Tags, opt query.IteratorOptions, cur stringCursor, aux []cursorAt, conds []cursorAt, condNames []string) *stringIterator { itr := &stringIterator{ cur: cur, aux: aux, opt: opt, point: query.StringPoint{ Name: name, Tags: tags, }, statsBuf: query.IteratorStats{ SeriesN: 1, }, } itr.stats = itr.statsBuf if len(aux) > 0 { itr.point.Aux = make([]interface{}, len(aux)) } if opt.Condition != nil { itr.m = make(map[string]interface{}, len(aux)+len(conds)) } itr.conds.names = condNames itr.conds.curs = conds return itr } // Next returns the next point from the iterator. func (itr *stringIterator) Next() (*query.StringPoint, error) { for { seek := tsdb.EOF if itr.cur != nil { // Read from the main cursor if we have one. itr.point.Time, itr.point.Value = itr.cur.nextString() seek = itr.point.Time } else { // Otherwise find lowest aux timestamp. for i := range itr.aux { if k, _ := itr.aux[i].peek(); k != tsdb.EOF { if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { seek = k } } } itr.point.Time = seek } // Exit if we have no more points or we are outside our time range. if itr.point.Time == tsdb.EOF { itr.copyStats() return nil, nil } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { itr.copyStats() return nil, nil } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { itr.copyStats() return nil, nil } // Read from each auxiliary cursor. for i := range itr.opt.Aux { itr.point.Aux[i] = itr.aux[i].nextAt(seek) } // Read from condition field cursors. for i := range itr.conds.curs { itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) } // Evaluate condition, if one exists. Retry if it fails. valuer := influxql.ValuerEval{ Valuer: influxql.MultiValuer( query.MathValuer{}, influxql.MapValuer(itr.m), ), } if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { continue } // Track points returned. itr.statsBuf.PointN++ // Copy buffer to stats periodically. if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { itr.copyStats() } return &itr.point, nil } } // copyStats copies from the itr stats buffer to the stats under lock. func (itr *stringIterator) copyStats() { itr.statsLock.Lock() itr.stats = itr.statsBuf itr.statsLock.Unlock() } // Stats returns stats on the points processed. func (itr *stringIterator) Stats() query.IteratorStats { itr.statsLock.Lock() stats := itr.stats itr.statsLock.Unlock() return stats } // Close closes the iterator. func (itr *stringIterator) Close() error { cursorsAt(itr.aux).close() itr.aux = nil cursorsAt(itr.conds.curs).close() itr.conds.curs = nil if itr.cur != nil { err := itr.cur.close() itr.cur = nil return err } return nil } // stringLimitIterator type stringLimitIterator struct { input query.StringIterator opt query.IteratorOptions n int } func newStringLimitIterator(input query.StringIterator, opt query.IteratorOptions) *stringLimitIterator { return &stringLimitIterator{ input: input, opt: opt, } } func (itr *stringLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } func (itr *stringLimitIterator) Close() error { return itr.input.Close() } func (itr *stringLimitIterator) Next() (*query.StringPoint, error) { // Check if we are beyond the limit. if (itr.n - itr.opt.Offset) > itr.opt.Limit { return nil, nil } // Read the next point. p, err := itr.input.Next() if p == nil || err != nil { return nil, err } // Increment counter. itr.n++ // Offsets are handled by a higher level iterator so return all points. return p, nil } // stringCursor represents an object for iterating over a single string field. type stringCursor interface { cursor nextString() (t int64, v string) } func newStringCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) stringCursor { if ascending { return newStringAscendingCursor(seek, cacheValues, tsmKeyCursor) } return newStringDescendingCursor(seek, cacheValues, tsmKeyCursor) } type stringAscendingCursor struct { cache struct { values Values pos int } tsm struct { values []StringValue pos int keyCursor *KeyCursor } } func newStringAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *stringAscendingCursor { c := &stringAscendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) return c } // peekCache returns the current time/value from the cache. func (c *stringAscendingCursor) peekCache() (t int64, v string) { if c.cache.pos >= len(c.cache.values) { return tsdb.EOF, "" } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(StringValue).value } // peekTSM returns the current time/value from tsm. func (c *stringAscendingCursor) peekTSM() (t int64, v string) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, "" } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *stringAscendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *stringAscendingCursor) next() (int64, interface{}) { return c.nextString() } // nextString returns the next key/value for the cursor. func (c *stringAscendingCursor) nextString() (int64, string) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, "" } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *stringAscendingCursor) nextCache() { if c.cache.pos >= len(c.cache.values) { return } c.cache.pos++ } // nextTSM returns the next value from the TSM files. func (c *stringAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = 0 } } type stringDescendingCursor struct { cache struct { values Values pos int } tsm struct { values []StringValue pos int keyCursor *KeyCursor } } func newStringDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *stringDescendingCursor { c := &stringDescendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) if t, _ := c.peekCache(); t != seek { c.cache.pos-- } c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) if t, _ := c.peekTSM(); t != seek { c.tsm.pos-- } return c } // peekCache returns the current time/value from the cache. func (c *stringDescendingCursor) peekCache() (t int64, v string) { if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { return tsdb.EOF, "" } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(StringValue).value } // peekTSM returns the current time/value from tsm. func (c *stringDescendingCursor) peekTSM() (t int64, v string) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, "" } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *stringDescendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *stringDescendingCursor) next() (int64, interface{}) { return c.nextString() } // nextString returns the next key/value for the cursor. func (c *stringDescendingCursor) nextString() (int64, string) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, "" } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *stringDescendingCursor) nextCache() { if c.cache.pos < 0 { return } c.cache.pos-- } // nextTSM returns the next value from the TSM files. func (c *stringDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = len(c.tsm.values) - 1 } } type booleanFinalizerIterator struct { query.BooleanIterator logger *zap.Logger } func newBooleanFinalizerIterator(inner query.BooleanIterator, logger *zap.Logger) *booleanFinalizerIterator { itr := &booleanFinalizerIterator{BooleanIterator: inner, logger: logger} runtime.SetFinalizer(itr, (*booleanFinalizerIterator).closeGC) return itr } func (itr *booleanFinalizerIterator) closeGC() { go func() { itr.logger.Error("BooleanIterator finalized by GC") itr.Close() }() } func (itr *booleanFinalizerIterator) Close() error { runtime.SetFinalizer(itr, nil) return itr.BooleanIterator.Close() } type booleanInstrumentedIterator struct { query.BooleanIterator span *tracing.Span group *metrics.Group } func newBooleanInstrumentedIterator(inner query.BooleanIterator, span *tracing.Span, group *metrics.Group) *booleanInstrumentedIterator { return &booleanInstrumentedIterator{BooleanIterator: inner, span: span, group: group} } func (itr *booleanInstrumentedIterator) Close() error { var f fields.Fields itr.group.ForEach(func(v metrics.Metric) { switch m := v.(type) { case *metrics.Counter: f = append(f, fields.Int64(m.Name(), m.Value())) case *metrics.Timer: f = append(f, fields.Duration(m.Name(), m.Value())) default: panic("unexpected metrics") } }) itr.span.SetFields(f) itr.span.Finish() return itr.BooleanIterator.Close() } type booleanIterator struct { cur booleanCursor aux []cursorAt conds struct { names []string curs []cursorAt } opt query.IteratorOptions m map[string]interface{} // map used for condition evaluation point query.BooleanPoint // reusable buffer statsLock sync.Mutex stats query.IteratorStats statsBuf query.IteratorStats } func newBooleanIterator(name string, tags query.Tags, opt query.IteratorOptions, cur booleanCursor, aux []cursorAt, conds []cursorAt, condNames []string) *booleanIterator { itr := &booleanIterator{ cur: cur, aux: aux, opt: opt, point: query.BooleanPoint{ Name: name, Tags: tags, }, statsBuf: query.IteratorStats{ SeriesN: 1, }, } itr.stats = itr.statsBuf if len(aux) > 0 { itr.point.Aux = make([]interface{}, len(aux)) } if opt.Condition != nil { itr.m = make(map[string]interface{}, len(aux)+len(conds)) } itr.conds.names = condNames itr.conds.curs = conds return itr } // Next returns the next point from the iterator. func (itr *booleanIterator) Next() (*query.BooleanPoint, error) { for { seek := tsdb.EOF if itr.cur != nil { // Read from the main cursor if we have one. itr.point.Time, itr.point.Value = itr.cur.nextBoolean() seek = itr.point.Time } else { // Otherwise find lowest aux timestamp. for i := range itr.aux { if k, _ := itr.aux[i].peek(); k != tsdb.EOF { if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { seek = k } } } itr.point.Time = seek } // Exit if we have no more points or we are outside our time range. if itr.point.Time == tsdb.EOF { itr.copyStats() return nil, nil } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { itr.copyStats() return nil, nil } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { itr.copyStats() return nil, nil } // Read from each auxiliary cursor. for i := range itr.opt.Aux { itr.point.Aux[i] = itr.aux[i].nextAt(seek) } // Read from condition field cursors. for i := range itr.conds.curs { itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) } // Evaluate condition, if one exists. Retry if it fails. valuer := influxql.ValuerEval{ Valuer: influxql.MultiValuer( query.MathValuer{}, influxql.MapValuer(itr.m), ), } if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { continue } // Track points returned. itr.statsBuf.PointN++ // Copy buffer to stats periodically. if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { itr.copyStats() } return &itr.point, nil } } // copyStats copies from the itr stats buffer to the stats under lock. func (itr *booleanIterator) copyStats() { itr.statsLock.Lock() itr.stats = itr.statsBuf itr.statsLock.Unlock() } // Stats returns stats on the points processed. func (itr *booleanIterator) Stats() query.IteratorStats { itr.statsLock.Lock() stats := itr.stats itr.statsLock.Unlock() return stats } // Close closes the iterator. func (itr *booleanIterator) Close() error { cursorsAt(itr.aux).close() itr.aux = nil cursorsAt(itr.conds.curs).close() itr.conds.curs = nil if itr.cur != nil { err := itr.cur.close() itr.cur = nil return err } return nil } // booleanLimitIterator type booleanLimitIterator struct { input query.BooleanIterator opt query.IteratorOptions n int } func newBooleanLimitIterator(input query.BooleanIterator, opt query.IteratorOptions) *booleanLimitIterator { return &booleanLimitIterator{ input: input, opt: opt, } } func (itr *booleanLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } func (itr *booleanLimitIterator) Close() error { return itr.input.Close() } func (itr *booleanLimitIterator) Next() (*query.BooleanPoint, error) { // Check if we are beyond the limit. if (itr.n - itr.opt.Offset) > itr.opt.Limit { return nil, nil } // Read the next point. p, err := itr.input.Next() if p == nil || err != nil { return nil, err } // Increment counter. itr.n++ // Offsets are handled by a higher level iterator so return all points. return p, nil } // booleanCursor represents an object for iterating over a single boolean field. type booleanCursor interface { cursor nextBoolean() (t int64, v bool) } func newBooleanCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) booleanCursor { if ascending { return newBooleanAscendingCursor(seek, cacheValues, tsmKeyCursor) } return newBooleanDescendingCursor(seek, cacheValues, tsmKeyCursor) } type booleanAscendingCursor struct { cache struct { values Values pos int } tsm struct { values []BooleanValue pos int keyCursor *KeyCursor } } func newBooleanAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *booleanAscendingCursor { c := &booleanAscendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) return c } // peekCache returns the current time/value from the cache. func (c *booleanAscendingCursor) peekCache() (t int64, v bool) { if c.cache.pos >= len(c.cache.values) { return tsdb.EOF, false } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(BooleanValue).value } // peekTSM returns the current time/value from tsm. func (c *booleanAscendingCursor) peekTSM() (t int64, v bool) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, false } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *booleanAscendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *booleanAscendingCursor) next() (int64, interface{}) { return c.nextBoolean() } // nextBoolean returns the next key/value for the cursor. func (c *booleanAscendingCursor) nextBoolean() (int64, bool) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, false } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *booleanAscendingCursor) nextCache() { if c.cache.pos >= len(c.cache.values) { return } c.cache.pos++ } // nextTSM returns the next value from the TSM files. func (c *booleanAscendingCursor) nextTSM() { c.tsm.pos++ if c.tsm.pos >= len(c.tsm.values) { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = 0 } } type booleanDescendingCursor struct { cache struct { values Values pos int } tsm struct { values []BooleanValue pos int keyCursor *KeyCursor } } func newBooleanDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *booleanDescendingCursor { c := &booleanDescendingCursor{} c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) if t, _ := c.peekCache(); t != seek { c.cache.pos-- } c.tsm.keyCursor = tsmKeyCursor c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { return c.tsm.values[i].UnixNano() >= seek }) if t, _ := c.peekTSM(); t != seek { c.tsm.pos-- } return c } // peekCache returns the current time/value from the cache. func (c *booleanDescendingCursor) peekCache() (t int64, v bool) { if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { return tsdb.EOF, false } item := c.cache.values[c.cache.pos] return item.UnixNano(), item.(BooleanValue).value } // peekTSM returns the current time/value from tsm. func (c *booleanDescendingCursor) peekTSM() (t int64, v bool) { if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { return tsdb.EOF, false } item := c.tsm.values[c.tsm.pos] return item.UnixNano(), item.value } // close closes the cursor and any dependent cursors. func (c *booleanDescendingCursor) close() error { if c.tsm.keyCursor == nil { return nil } c.tsm.keyCursor.Close() c.tsm.keyCursor = nil c.cache.values = nil c.tsm.values = nil return nil } // next returns the next key/value for the cursor. func (c *booleanDescendingCursor) next() (int64, interface{}) { return c.nextBoolean() } // nextBoolean returns the next key/value for the cursor. func (c *booleanDescendingCursor) nextBoolean() (int64, bool) { ckey, cvalue := c.peekCache() tkey, tvalue := c.peekTSM() // No more data in cache or in TSM files. if ckey == tsdb.EOF && tkey == tsdb.EOF { return tsdb.EOF, false } // Both cache and tsm files have the same key, cache takes precedence. if ckey == tkey { c.nextCache() c.nextTSM() return ckey, cvalue } // Buffered cache key precedes that in TSM file. if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { c.nextCache() return ckey, cvalue } // Buffered TSM key precedes that in cache. c.nextTSM() return tkey, tvalue } // nextCache returns the next value from the cache. func (c *booleanDescendingCursor) nextCache() { if c.cache.pos < 0 { return } c.cache.pos-- } // nextTSM returns the next value from the TSM files. func (c *booleanDescendingCursor) nextTSM() { c.tsm.pos-- if c.tsm.pos < 0 { c.tsm.keyCursor.Next() c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) if len(c.tsm.values) == 0 { return } c.tsm.pos = len(c.tsm.values) - 1 } } var _ = fmt.Print