diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 7e997e9650..87e1280de1 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -29,7 +29,6 @@ import ( "go.uber.org/zap" ) -//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl array_cursor.gen.go.tmpl array_cursor_iterator.gen.go.tmpl //go:generate env GO111MODULE=on go run github.com/influxdata/platform/tools/tmpl -i -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go //go:generate env GO111MODULE=on go run github.com/influxdata/platform/tools/tmpl -i -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go //go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl diff --git a/tsdb/tsm1/iterator.gen.go b/tsdb/tsm1/iterator.gen.go deleted file mode 100644 index ca22825ec2..0000000000 --- a/tsdb/tsm1/iterator.gen.go +++ /dev/null @@ -1,2368 +0,0 @@ -// Generated by tmpl -// https://github.com/benbjohnson/tmpl -// -// DO NOT EDIT! -// Source: iterator.gen.go.tmpl - -package tsm1 - -import ( - "fmt" - "runtime" - "sort" - "sync" - - "github.com/influxdata/influxql" - "github.com/influxdata/platform/query" - "github.com/influxdata/platform/tsdb" - "go.uber.org/zap" -) - -type cursor interface { - close() error - next() (t int64, v interface{}) -} - -// cursorAt provides a bufferred cursor interface. -// This required for literal value cursors which don't have a time value. -type cursorAt interface { - close() error - peek() (k int64, v interface{}) - nextAt(seek int64) interface{} -} - -type nilCursor struct{} - -func (nilCursor) next() (int64, interface{}) { return tsdb.EOF, nil } -func (nilCursor) close() error { return nil } - -// bufCursor implements a bufferred cursor. -type bufCursor struct { - cur cursor - buf struct { - key int64 - value interface{} - filled bool - } - ascending bool -} - -// newBufCursor returns a bufferred wrapper for cur. -func newBufCursor(cur cursor, ascending bool) *bufCursor { - return &bufCursor{cur: cur, ascending: ascending} -} - -func (c *bufCursor) close() error { - if c.cur == nil { - return nil - } - - err := c.cur.close() - c.cur = nil - return err -} - -// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor. -func (c *bufCursor) next() (int64, interface{}) { - if c.buf.filled { - k, v := c.buf.key, c.buf.value - c.buf.filled = false - return k, v - } - return c.cur.next() -} - -// unread pushes k and v onto the buffer. -func (c *bufCursor) unread(k int64, v interface{}) { - c.buf.key, c.buf.value = k, v - c.buf.filled = true -} - -// peek reads next next key/value without removing them from the cursor. -func (c *bufCursor) peek() (k int64, v interface{}) { - k, v = c.next() - c.unread(k, v) - return -} - -// nextAt returns the next value where key is equal to seek. -// Skips over any keys that are less than seek. -// If the key doesn't exist then a nil value is returned instead. -func (c *bufCursor) nextAt(seek int64) interface{} { - for { - k, v := c.next() - if k != tsdb.EOF { - if k == seek { - return v - } else if c.ascending && k < seek { - continue - } else if !c.ascending && k > seek { - continue - } - c.unread(k, v) - } - - // Return "nil" value for type. - switch c.cur.(type) { - case floatCursor: - return (*float64)(nil) - case integerCursor: - return (*int64)(nil) - case unsignedCursor: - return (*uint64)(nil) - case stringCursor: - return (*string)(nil) - case booleanCursor: - return (*bool)(nil) - default: - panic("unreachable") - } - } -} - -// statsBufferCopyIntervalN is the number of points that are read before -// copying the stats buffer to the iterator's stats field. This is used to -// amortize the cost of using a mutex when updating stats. -const statsBufferCopyIntervalN = 100 - -type floatFinalizerIterator struct { - query.FloatIterator - logger *zap.Logger -} - -func newFloatFinalizerIterator(inner query.FloatIterator, logger *zap.Logger) *floatFinalizerIterator { - itr := &floatFinalizerIterator{FloatIterator: inner, logger: logger} - runtime.SetFinalizer(itr, (*floatFinalizerIterator).closeGC) - return itr -} - -func (itr *floatFinalizerIterator) closeGC() { - go func() { - itr.logger.Error("FloatIterator finalized by GC") - itr.Close() - }() -} - -func (itr *floatFinalizerIterator) Close() error { - runtime.SetFinalizer(itr, nil) - return itr.FloatIterator.Close() -} - -type floatIterator struct { - cur floatCursor - aux []cursorAt - conds struct { - names []string - curs []cursorAt - } - opt query.IteratorOptions - - m map[string]interface{} // map used for condition evaluation - point query.FloatPoint // reusable buffer - - statsLock sync.Mutex - stats query.IteratorStats - statsBuf query.IteratorStats -} - -func newFloatIterator(name string, tags query.Tags, opt query.IteratorOptions, cur floatCursor, aux []cursorAt, conds []cursorAt, condNames []string) *floatIterator { - itr := &floatIterator{ - cur: cur, - aux: aux, - opt: opt, - point: query.FloatPoint{ - Name: name, - Tags: tags, - }, - statsBuf: query.IteratorStats{ - SeriesN: 1, - }, - } - itr.stats = itr.statsBuf - - if len(aux) > 0 { - itr.point.Aux = make([]interface{}, len(aux)) - } - - if opt.Condition != nil { - itr.m = make(map[string]interface{}, len(aux)+len(conds)) - } - itr.conds.names = condNames - itr.conds.curs = conds - - return itr -} - -// Next returns the next point from the iterator. -func (itr *floatIterator) Next() (*query.FloatPoint, error) { - for { - seek := tsdb.EOF - - if itr.cur != nil { - // Read from the main cursor if we have one. - itr.point.Time, itr.point.Value = itr.cur.nextFloat() - seek = itr.point.Time - } else { - // Otherwise find lowest aux timestamp. - for i := range itr.aux { - if k, _ := itr.aux[i].peek(); k != tsdb.EOF { - if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { - seek = k - } - } - } - itr.point.Time = seek - } - - // Exit if we have no more points or we are outside our time range. - if itr.point.Time == tsdb.EOF { - itr.copyStats() - return nil, nil - } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { - itr.copyStats() - return nil, nil - } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { - itr.copyStats() - return nil, nil - } - - // Read from each auxiliary cursor. - for i := range itr.opt.Aux { - itr.point.Aux[i] = itr.aux[i].nextAt(seek) - } - - // Read from condition field cursors. - for i := range itr.conds.curs { - itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) - } - - // Evaluate condition, if one exists. Retry if it fails. - valuer := influxql.ValuerEval{ - Valuer: influxql.MultiValuer( - query.MathValuer{}, - influxql.MapValuer(itr.m), - ), - } - if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { - continue - } - - // Track points returned. - itr.statsBuf.PointN++ - - // Copy buffer to stats periodically. - if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { - itr.copyStats() - } - - return &itr.point, nil - } -} - -// copyStats copies from the itr stats buffer to the stats under lock. -func (itr *floatIterator) copyStats() { - itr.statsLock.Lock() - itr.stats = itr.statsBuf - itr.statsLock.Unlock() -} - -// Stats returns stats on the points processed. -func (itr *floatIterator) Stats() query.IteratorStats { - itr.statsLock.Lock() - stats := itr.stats - itr.statsLock.Unlock() - return stats -} - -// Close closes the iterator. -func (itr *floatIterator) Close() error { - cursorsAt(itr.aux).close() - itr.aux = nil - cursorsAt(itr.conds.curs).close() - itr.conds.curs = nil - if itr.cur != nil { - err := itr.cur.close() - itr.cur = nil - return err - } - return nil -} - -// floatLimitIterator -type floatLimitIterator struct { - input query.FloatIterator - opt query.IteratorOptions - n int -} - -func newFloatLimitIterator(input query.FloatIterator, opt query.IteratorOptions) *floatLimitIterator { - return &floatLimitIterator{ - input: input, - opt: opt, - } -} - -func (itr *floatLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } -func (itr *floatLimitIterator) Close() error { return itr.input.Close() } - -func (itr *floatLimitIterator) Next() (*query.FloatPoint, error) { - // Check if we are beyond the limit. - if (itr.n - itr.opt.Offset) > itr.opt.Limit { - return nil, nil - } - - // Read the next point. - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - // Increment counter. - itr.n++ - - // Offsets are handled by a higher level iterator so return all points. - return p, nil -} - -// floatCursor represents an object for iterating over a single float field. -type floatCursor interface { - cursor - nextFloat() (t int64, v float64) -} - -func newFloatCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) floatCursor { - if ascending { - return newFloatAscendingCursor(seek, cacheValues, tsmKeyCursor) - } - return newFloatDescendingCursor(seek, cacheValues, tsmKeyCursor) -} - -type floatAscendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []FloatValue - pos int - keyCursor *KeyCursor - } -} - -func newFloatAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *floatAscendingCursor { - c := &floatAscendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *floatAscendingCursor) peekCache() (t int64, v float64) { - if c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, 0 - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(FloatValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *floatAscendingCursor) peekTSM() (t int64, v float64) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, 0 - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *floatAscendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *floatAscendingCursor) next() (int64, interface{}) { return c.nextFloat() } - -// nextFloat returns the next key/value for the cursor. -func (c *floatAscendingCursor) nextFloat() (int64, float64) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, 0 - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *floatAscendingCursor) nextCache() { - if c.cache.pos >= len(c.cache.values) { - return - } - c.cache.pos++ -} - -// nextTSM returns the next value from the TSM files. -func (c *floatAscendingCursor) nextTSM() { - c.tsm.pos++ - if c.tsm.pos >= len(c.tsm.values) { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = 0 - } -} - -type floatDescendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []FloatValue - pos int - keyCursor *KeyCursor - } -} - -func newFloatDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *floatDescendingCursor { - c := &floatDescendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - if t, _ := c.peekCache(); t != seek { - c.cache.pos-- - } - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - if t, _ := c.peekTSM(); t != seek { - c.tsm.pos-- - } - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *floatDescendingCursor) peekCache() (t int64, v float64) { - if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, 0 - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(FloatValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *floatDescendingCursor) peekTSM() (t int64, v float64) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, 0 - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *floatDescendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *floatDescendingCursor) next() (int64, interface{}) { return c.nextFloat() } - -// nextFloat returns the next key/value for the cursor. -func (c *floatDescendingCursor) nextFloat() (int64, float64) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, 0 - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *floatDescendingCursor) nextCache() { - if c.cache.pos < 0 { - return - } - c.cache.pos-- -} - -// nextTSM returns the next value from the TSM files. -func (c *floatDescendingCursor) nextTSM() { - c.tsm.pos-- - if c.tsm.pos < 0 { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = len(c.tsm.values) - 1 - } -} - -type integerFinalizerIterator struct { - query.IntegerIterator - logger *zap.Logger -} - -func newIntegerFinalizerIterator(inner query.IntegerIterator, logger *zap.Logger) *integerFinalizerIterator { - itr := &integerFinalizerIterator{IntegerIterator: inner, logger: logger} - runtime.SetFinalizer(itr, (*integerFinalizerIterator).closeGC) - return itr -} - -func (itr *integerFinalizerIterator) closeGC() { - go func() { - itr.logger.Error("IntegerIterator finalized by GC") - itr.Close() - }() -} - -func (itr *integerFinalizerIterator) Close() error { - runtime.SetFinalizer(itr, nil) - return itr.IntegerIterator.Close() -} - -type integerIterator struct { - cur integerCursor - aux []cursorAt - conds struct { - names []string - curs []cursorAt - } - opt query.IteratorOptions - - m map[string]interface{} // map used for condition evaluation - point query.IntegerPoint // reusable buffer - - statsLock sync.Mutex - stats query.IteratorStats - statsBuf query.IteratorStats -} - -func newIntegerIterator(name string, tags query.Tags, opt query.IteratorOptions, cur integerCursor, aux []cursorAt, conds []cursorAt, condNames []string) *integerIterator { - itr := &integerIterator{ - cur: cur, - aux: aux, - opt: opt, - point: query.IntegerPoint{ - Name: name, - Tags: tags, - }, - statsBuf: query.IteratorStats{ - SeriesN: 1, - }, - } - itr.stats = itr.statsBuf - - if len(aux) > 0 { - itr.point.Aux = make([]interface{}, len(aux)) - } - - if opt.Condition != nil { - itr.m = make(map[string]interface{}, len(aux)+len(conds)) - } - itr.conds.names = condNames - itr.conds.curs = conds - - return itr -} - -// Next returns the next point from the iterator. -func (itr *integerIterator) Next() (*query.IntegerPoint, error) { - for { - seek := tsdb.EOF - - if itr.cur != nil { - // Read from the main cursor if we have one. - itr.point.Time, itr.point.Value = itr.cur.nextInteger() - seek = itr.point.Time - } else { - // Otherwise find lowest aux timestamp. - for i := range itr.aux { - if k, _ := itr.aux[i].peek(); k != tsdb.EOF { - if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { - seek = k - } - } - } - itr.point.Time = seek - } - - // Exit if we have no more points or we are outside our time range. - if itr.point.Time == tsdb.EOF { - itr.copyStats() - return nil, nil - } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { - itr.copyStats() - return nil, nil - } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { - itr.copyStats() - return nil, nil - } - - // Read from each auxiliary cursor. - for i := range itr.opt.Aux { - itr.point.Aux[i] = itr.aux[i].nextAt(seek) - } - - // Read from condition field cursors. - for i := range itr.conds.curs { - itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) - } - - // Evaluate condition, if one exists. Retry if it fails. - valuer := influxql.ValuerEval{ - Valuer: influxql.MultiValuer( - query.MathValuer{}, - influxql.MapValuer(itr.m), - ), - } - if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { - continue - } - - // Track points returned. - itr.statsBuf.PointN++ - - // Copy buffer to stats periodically. - if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { - itr.copyStats() - } - - return &itr.point, nil - } -} - -// copyStats copies from the itr stats buffer to the stats under lock. -func (itr *integerIterator) copyStats() { - itr.statsLock.Lock() - itr.stats = itr.statsBuf - itr.statsLock.Unlock() -} - -// Stats returns stats on the points processed. -func (itr *integerIterator) Stats() query.IteratorStats { - itr.statsLock.Lock() - stats := itr.stats - itr.statsLock.Unlock() - return stats -} - -// Close closes the iterator. -func (itr *integerIterator) Close() error { - cursorsAt(itr.aux).close() - itr.aux = nil - cursorsAt(itr.conds.curs).close() - itr.conds.curs = nil - if itr.cur != nil { - err := itr.cur.close() - itr.cur = nil - return err - } - return nil -} - -// integerLimitIterator -type integerLimitIterator struct { - input query.IntegerIterator - opt query.IteratorOptions - n int -} - -func newIntegerLimitIterator(input query.IntegerIterator, opt query.IteratorOptions) *integerLimitIterator { - return &integerLimitIterator{ - input: input, - opt: opt, - } -} - -func (itr *integerLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } -func (itr *integerLimitIterator) Close() error { return itr.input.Close() } - -func (itr *integerLimitIterator) Next() (*query.IntegerPoint, error) { - // Check if we are beyond the limit. - if (itr.n - itr.opt.Offset) > itr.opt.Limit { - return nil, nil - } - - // Read the next point. - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - // Increment counter. - itr.n++ - - // Offsets are handled by a higher level iterator so return all points. - return p, nil -} - -// integerCursor represents an object for iterating over a single integer field. -type integerCursor interface { - cursor - nextInteger() (t int64, v int64) -} - -func newIntegerCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) integerCursor { - if ascending { - return newIntegerAscendingCursor(seek, cacheValues, tsmKeyCursor) - } - return newIntegerDescendingCursor(seek, cacheValues, tsmKeyCursor) -} - -type integerAscendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []IntegerValue - pos int - keyCursor *KeyCursor - } -} - -func newIntegerAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *integerAscendingCursor { - c := &integerAscendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *integerAscendingCursor) peekCache() (t int64, v int64) { - if c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, 0 - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(IntegerValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *integerAscendingCursor) peekTSM() (t int64, v int64) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, 0 - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *integerAscendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *integerAscendingCursor) next() (int64, interface{}) { return c.nextInteger() } - -// nextInteger returns the next key/value for the cursor. -func (c *integerAscendingCursor) nextInteger() (int64, int64) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, 0 - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *integerAscendingCursor) nextCache() { - if c.cache.pos >= len(c.cache.values) { - return - } - c.cache.pos++ -} - -// nextTSM returns the next value from the TSM files. -func (c *integerAscendingCursor) nextTSM() { - c.tsm.pos++ - if c.tsm.pos >= len(c.tsm.values) { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = 0 - } -} - -type integerDescendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []IntegerValue - pos int - keyCursor *KeyCursor - } -} - -func newIntegerDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *integerDescendingCursor { - c := &integerDescendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - if t, _ := c.peekCache(); t != seek { - c.cache.pos-- - } - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - if t, _ := c.peekTSM(); t != seek { - c.tsm.pos-- - } - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *integerDescendingCursor) peekCache() (t int64, v int64) { - if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, 0 - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(IntegerValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *integerDescendingCursor) peekTSM() (t int64, v int64) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, 0 - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *integerDescendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *integerDescendingCursor) next() (int64, interface{}) { return c.nextInteger() } - -// nextInteger returns the next key/value for the cursor. -func (c *integerDescendingCursor) nextInteger() (int64, int64) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, 0 - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *integerDescendingCursor) nextCache() { - if c.cache.pos < 0 { - return - } - c.cache.pos-- -} - -// nextTSM returns the next value from the TSM files. -func (c *integerDescendingCursor) nextTSM() { - c.tsm.pos-- - if c.tsm.pos < 0 { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = len(c.tsm.values) - 1 - } -} - -type unsignedFinalizerIterator struct { - query.UnsignedIterator - logger *zap.Logger -} - -func newUnsignedFinalizerIterator(inner query.UnsignedIterator, logger *zap.Logger) *unsignedFinalizerIterator { - itr := &unsignedFinalizerIterator{UnsignedIterator: inner, logger: logger} - runtime.SetFinalizer(itr, (*unsignedFinalizerIterator).closeGC) - return itr -} - -func (itr *unsignedFinalizerIterator) closeGC() { - go func() { - itr.logger.Error("UnsignedIterator finalized by GC") - itr.Close() - }() -} - -func (itr *unsignedFinalizerIterator) Close() error { - runtime.SetFinalizer(itr, nil) - return itr.UnsignedIterator.Close() -} - -type unsignedIterator struct { - cur unsignedCursor - aux []cursorAt - conds struct { - names []string - curs []cursorAt - } - opt query.IteratorOptions - - m map[string]interface{} // map used for condition evaluation - point query.UnsignedPoint // reusable buffer - - statsLock sync.Mutex - stats query.IteratorStats - statsBuf query.IteratorStats -} - -func newUnsignedIterator(name string, tags query.Tags, opt query.IteratorOptions, cur unsignedCursor, aux []cursorAt, conds []cursorAt, condNames []string) *unsignedIterator { - itr := &unsignedIterator{ - cur: cur, - aux: aux, - opt: opt, - point: query.UnsignedPoint{ - Name: name, - Tags: tags, - }, - statsBuf: query.IteratorStats{ - SeriesN: 1, - }, - } - itr.stats = itr.statsBuf - - if len(aux) > 0 { - itr.point.Aux = make([]interface{}, len(aux)) - } - - if opt.Condition != nil { - itr.m = make(map[string]interface{}, len(aux)+len(conds)) - } - itr.conds.names = condNames - itr.conds.curs = conds - - return itr -} - -// Next returns the next point from the iterator. -func (itr *unsignedIterator) Next() (*query.UnsignedPoint, error) { - for { - seek := tsdb.EOF - - if itr.cur != nil { - // Read from the main cursor if we have one. - itr.point.Time, itr.point.Value = itr.cur.nextUnsigned() - seek = itr.point.Time - } else { - // Otherwise find lowest aux timestamp. - for i := range itr.aux { - if k, _ := itr.aux[i].peek(); k != tsdb.EOF { - if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { - seek = k - } - } - } - itr.point.Time = seek - } - - // Exit if we have no more points or we are outside our time range. - if itr.point.Time == tsdb.EOF { - itr.copyStats() - return nil, nil - } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { - itr.copyStats() - return nil, nil - } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { - itr.copyStats() - return nil, nil - } - - // Read from each auxiliary cursor. - for i := range itr.opt.Aux { - itr.point.Aux[i] = itr.aux[i].nextAt(seek) - } - - // Read from condition field cursors. - for i := range itr.conds.curs { - itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) - } - - // Evaluate condition, if one exists. Retry if it fails. - valuer := influxql.ValuerEval{ - Valuer: influxql.MultiValuer( - query.MathValuer{}, - influxql.MapValuer(itr.m), - ), - } - if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { - continue - } - - // Track points returned. - itr.statsBuf.PointN++ - - // Copy buffer to stats periodically. - if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { - itr.copyStats() - } - - return &itr.point, nil - } -} - -// copyStats copies from the itr stats buffer to the stats under lock. -func (itr *unsignedIterator) copyStats() { - itr.statsLock.Lock() - itr.stats = itr.statsBuf - itr.statsLock.Unlock() -} - -// Stats returns stats on the points processed. -func (itr *unsignedIterator) Stats() query.IteratorStats { - itr.statsLock.Lock() - stats := itr.stats - itr.statsLock.Unlock() - return stats -} - -// Close closes the iterator. -func (itr *unsignedIterator) Close() error { - cursorsAt(itr.aux).close() - itr.aux = nil - cursorsAt(itr.conds.curs).close() - itr.conds.curs = nil - if itr.cur != nil { - err := itr.cur.close() - itr.cur = nil - return err - } - return nil -} - -// unsignedLimitIterator -type unsignedLimitIterator struct { - input query.UnsignedIterator - opt query.IteratorOptions - n int -} - -func newUnsignedLimitIterator(input query.UnsignedIterator, opt query.IteratorOptions) *unsignedLimitIterator { - return &unsignedLimitIterator{ - input: input, - opt: opt, - } -} - -func (itr *unsignedLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } -func (itr *unsignedLimitIterator) Close() error { return itr.input.Close() } - -func (itr *unsignedLimitIterator) Next() (*query.UnsignedPoint, error) { - // Check if we are beyond the limit. - if (itr.n - itr.opt.Offset) > itr.opt.Limit { - return nil, nil - } - - // Read the next point. - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - // Increment counter. - itr.n++ - - // Offsets are handled by a higher level iterator so return all points. - return p, nil -} - -// unsignedCursor represents an object for iterating over a single unsigned field. -type unsignedCursor interface { - cursor - nextUnsigned() (t int64, v uint64) -} - -func newUnsignedCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) unsignedCursor { - if ascending { - return newUnsignedAscendingCursor(seek, cacheValues, tsmKeyCursor) - } - return newUnsignedDescendingCursor(seek, cacheValues, tsmKeyCursor) -} - -type unsignedAscendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []UnsignedValue - pos int - keyCursor *KeyCursor - } -} - -func newUnsignedAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *unsignedAscendingCursor { - c := &unsignedAscendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *unsignedAscendingCursor) peekCache() (t int64, v uint64) { - if c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, 0 - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(UnsignedValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *unsignedAscendingCursor) peekTSM() (t int64, v uint64) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, 0 - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *unsignedAscendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *unsignedAscendingCursor) next() (int64, interface{}) { return c.nextUnsigned() } - -// nextUnsigned returns the next key/value for the cursor. -func (c *unsignedAscendingCursor) nextUnsigned() (int64, uint64) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, 0 - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *unsignedAscendingCursor) nextCache() { - if c.cache.pos >= len(c.cache.values) { - return - } - c.cache.pos++ -} - -// nextTSM returns the next value from the TSM files. -func (c *unsignedAscendingCursor) nextTSM() { - c.tsm.pos++ - if c.tsm.pos >= len(c.tsm.values) { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = 0 - } -} - -type unsignedDescendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []UnsignedValue - pos int - keyCursor *KeyCursor - } -} - -func newUnsignedDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *unsignedDescendingCursor { - c := &unsignedDescendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - if t, _ := c.peekCache(); t != seek { - c.cache.pos-- - } - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - if t, _ := c.peekTSM(); t != seek { - c.tsm.pos-- - } - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *unsignedDescendingCursor) peekCache() (t int64, v uint64) { - if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, 0 - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(UnsignedValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *unsignedDescendingCursor) peekTSM() (t int64, v uint64) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, 0 - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *unsignedDescendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *unsignedDescendingCursor) next() (int64, interface{}) { return c.nextUnsigned() } - -// nextUnsigned returns the next key/value for the cursor. -func (c *unsignedDescendingCursor) nextUnsigned() (int64, uint64) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, 0 - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *unsignedDescendingCursor) nextCache() { - if c.cache.pos < 0 { - return - } - c.cache.pos-- -} - -// nextTSM returns the next value from the TSM files. -func (c *unsignedDescendingCursor) nextTSM() { - c.tsm.pos-- - if c.tsm.pos < 0 { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = len(c.tsm.values) - 1 - } -} - -type stringFinalizerIterator struct { - query.StringIterator - logger *zap.Logger -} - -func newStringFinalizerIterator(inner query.StringIterator, logger *zap.Logger) *stringFinalizerIterator { - itr := &stringFinalizerIterator{StringIterator: inner, logger: logger} - runtime.SetFinalizer(itr, (*stringFinalizerIterator).closeGC) - return itr -} - -func (itr *stringFinalizerIterator) closeGC() { - go func() { - itr.logger.Error("StringIterator finalized by GC") - itr.Close() - }() -} - -func (itr *stringFinalizerIterator) Close() error { - runtime.SetFinalizer(itr, nil) - return itr.StringIterator.Close() -} - -type stringIterator struct { - cur stringCursor - aux []cursorAt - conds struct { - names []string - curs []cursorAt - } - opt query.IteratorOptions - - m map[string]interface{} // map used for condition evaluation - point query.StringPoint // reusable buffer - - statsLock sync.Mutex - stats query.IteratorStats - statsBuf query.IteratorStats -} - -func newStringIterator(name string, tags query.Tags, opt query.IteratorOptions, cur stringCursor, aux []cursorAt, conds []cursorAt, condNames []string) *stringIterator { - itr := &stringIterator{ - cur: cur, - aux: aux, - opt: opt, - point: query.StringPoint{ - Name: name, - Tags: tags, - }, - statsBuf: query.IteratorStats{ - SeriesN: 1, - }, - } - itr.stats = itr.statsBuf - - if len(aux) > 0 { - itr.point.Aux = make([]interface{}, len(aux)) - } - - if opt.Condition != nil { - itr.m = make(map[string]interface{}, len(aux)+len(conds)) - } - itr.conds.names = condNames - itr.conds.curs = conds - - return itr -} - -// Next returns the next point from the iterator. -func (itr *stringIterator) Next() (*query.StringPoint, error) { - for { - seek := tsdb.EOF - - if itr.cur != nil { - // Read from the main cursor if we have one. - itr.point.Time, itr.point.Value = itr.cur.nextString() - seek = itr.point.Time - } else { - // Otherwise find lowest aux timestamp. - for i := range itr.aux { - if k, _ := itr.aux[i].peek(); k != tsdb.EOF { - if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { - seek = k - } - } - } - itr.point.Time = seek - } - - // Exit if we have no more points or we are outside our time range. - if itr.point.Time == tsdb.EOF { - itr.copyStats() - return nil, nil - } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { - itr.copyStats() - return nil, nil - } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { - itr.copyStats() - return nil, nil - } - - // Read from each auxiliary cursor. - for i := range itr.opt.Aux { - itr.point.Aux[i] = itr.aux[i].nextAt(seek) - } - - // Read from condition field cursors. - for i := range itr.conds.curs { - itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) - } - - // Evaluate condition, if one exists. Retry if it fails. - valuer := influxql.ValuerEval{ - Valuer: influxql.MultiValuer( - query.MathValuer{}, - influxql.MapValuer(itr.m), - ), - } - if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { - continue - } - - // Track points returned. - itr.statsBuf.PointN++ - - // Copy buffer to stats periodically. - if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { - itr.copyStats() - } - - return &itr.point, nil - } -} - -// copyStats copies from the itr stats buffer to the stats under lock. -func (itr *stringIterator) copyStats() { - itr.statsLock.Lock() - itr.stats = itr.statsBuf - itr.statsLock.Unlock() -} - -// Stats returns stats on the points processed. -func (itr *stringIterator) Stats() query.IteratorStats { - itr.statsLock.Lock() - stats := itr.stats - itr.statsLock.Unlock() - return stats -} - -// Close closes the iterator. -func (itr *stringIterator) Close() error { - cursorsAt(itr.aux).close() - itr.aux = nil - cursorsAt(itr.conds.curs).close() - itr.conds.curs = nil - if itr.cur != nil { - err := itr.cur.close() - itr.cur = nil - return err - } - return nil -} - -// stringLimitIterator -type stringLimitIterator struct { - input query.StringIterator - opt query.IteratorOptions - n int -} - -func newStringLimitIterator(input query.StringIterator, opt query.IteratorOptions) *stringLimitIterator { - return &stringLimitIterator{ - input: input, - opt: opt, - } -} - -func (itr *stringLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } -func (itr *stringLimitIterator) Close() error { return itr.input.Close() } - -func (itr *stringLimitIterator) Next() (*query.StringPoint, error) { - // Check if we are beyond the limit. - if (itr.n - itr.opt.Offset) > itr.opt.Limit { - return nil, nil - } - - // Read the next point. - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - // Increment counter. - itr.n++ - - // Offsets are handled by a higher level iterator so return all points. - return p, nil -} - -// stringCursor represents an object for iterating over a single string field. -type stringCursor interface { - cursor - nextString() (t int64, v string) -} - -func newStringCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) stringCursor { - if ascending { - return newStringAscendingCursor(seek, cacheValues, tsmKeyCursor) - } - return newStringDescendingCursor(seek, cacheValues, tsmKeyCursor) -} - -type stringAscendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []StringValue - pos int - keyCursor *KeyCursor - } -} - -func newStringAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *stringAscendingCursor { - c := &stringAscendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *stringAscendingCursor) peekCache() (t int64, v string) { - if c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, "" - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(StringValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *stringAscendingCursor) peekTSM() (t int64, v string) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, "" - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *stringAscendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *stringAscendingCursor) next() (int64, interface{}) { return c.nextString() } - -// nextString returns the next key/value for the cursor. -func (c *stringAscendingCursor) nextString() (int64, string) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, "" - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *stringAscendingCursor) nextCache() { - if c.cache.pos >= len(c.cache.values) { - return - } - c.cache.pos++ -} - -// nextTSM returns the next value from the TSM files. -func (c *stringAscendingCursor) nextTSM() { - c.tsm.pos++ - if c.tsm.pos >= len(c.tsm.values) { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = 0 - } -} - -type stringDescendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []StringValue - pos int - keyCursor *KeyCursor - } -} - -func newStringDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *stringDescendingCursor { - c := &stringDescendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - if t, _ := c.peekCache(); t != seek { - c.cache.pos-- - } - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - if t, _ := c.peekTSM(); t != seek { - c.tsm.pos-- - } - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *stringDescendingCursor) peekCache() (t int64, v string) { - if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, "" - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(StringValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *stringDescendingCursor) peekTSM() (t int64, v string) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, "" - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *stringDescendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *stringDescendingCursor) next() (int64, interface{}) { return c.nextString() } - -// nextString returns the next key/value for the cursor. -func (c *stringDescendingCursor) nextString() (int64, string) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, "" - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *stringDescendingCursor) nextCache() { - if c.cache.pos < 0 { - return - } - c.cache.pos-- -} - -// nextTSM returns the next value from the TSM files. -func (c *stringDescendingCursor) nextTSM() { - c.tsm.pos-- - if c.tsm.pos < 0 { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = len(c.tsm.values) - 1 - } -} - -type booleanFinalizerIterator struct { - query.BooleanIterator - logger *zap.Logger -} - -func newBooleanFinalizerIterator(inner query.BooleanIterator, logger *zap.Logger) *booleanFinalizerIterator { - itr := &booleanFinalizerIterator{BooleanIterator: inner, logger: logger} - runtime.SetFinalizer(itr, (*booleanFinalizerIterator).closeGC) - return itr -} - -func (itr *booleanFinalizerIterator) closeGC() { - go func() { - itr.logger.Error("BooleanIterator finalized by GC") - itr.Close() - }() -} - -func (itr *booleanFinalizerIterator) Close() error { - runtime.SetFinalizer(itr, nil) - return itr.BooleanIterator.Close() -} - -type booleanIterator struct { - cur booleanCursor - aux []cursorAt - conds struct { - names []string - curs []cursorAt - } - opt query.IteratorOptions - - m map[string]interface{} // map used for condition evaluation - point query.BooleanPoint // reusable buffer - - statsLock sync.Mutex - stats query.IteratorStats - statsBuf query.IteratorStats -} - -func newBooleanIterator(name string, tags query.Tags, opt query.IteratorOptions, cur booleanCursor, aux []cursorAt, conds []cursorAt, condNames []string) *booleanIterator { - itr := &booleanIterator{ - cur: cur, - aux: aux, - opt: opt, - point: query.BooleanPoint{ - Name: name, - Tags: tags, - }, - statsBuf: query.IteratorStats{ - SeriesN: 1, - }, - } - itr.stats = itr.statsBuf - - if len(aux) > 0 { - itr.point.Aux = make([]interface{}, len(aux)) - } - - if opt.Condition != nil { - itr.m = make(map[string]interface{}, len(aux)+len(conds)) - } - itr.conds.names = condNames - itr.conds.curs = conds - - return itr -} - -// Next returns the next point from the iterator. -func (itr *booleanIterator) Next() (*query.BooleanPoint, error) { - for { - seek := tsdb.EOF - - if itr.cur != nil { - // Read from the main cursor if we have one. - itr.point.Time, itr.point.Value = itr.cur.nextBoolean() - seek = itr.point.Time - } else { - // Otherwise find lowest aux timestamp. - for i := range itr.aux { - if k, _ := itr.aux[i].peek(); k != tsdb.EOF { - if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { - seek = k - } - } - } - itr.point.Time = seek - } - - // Exit if we have no more points or we are outside our time range. - if itr.point.Time == tsdb.EOF { - itr.copyStats() - return nil, nil - } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { - itr.copyStats() - return nil, nil - } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { - itr.copyStats() - return nil, nil - } - - // Read from each auxiliary cursor. - for i := range itr.opt.Aux { - itr.point.Aux[i] = itr.aux[i].nextAt(seek) - } - - // Read from condition field cursors. - for i := range itr.conds.curs { - itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) - } - - // Evaluate condition, if one exists. Retry if it fails. - valuer := influxql.ValuerEval{ - Valuer: influxql.MultiValuer( - query.MathValuer{}, - influxql.MapValuer(itr.m), - ), - } - if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { - continue - } - - // Track points returned. - itr.statsBuf.PointN++ - - // Copy buffer to stats periodically. - if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 { - itr.copyStats() - } - - return &itr.point, nil - } -} - -// copyStats copies from the itr stats buffer to the stats under lock. -func (itr *booleanIterator) copyStats() { - itr.statsLock.Lock() - itr.stats = itr.statsBuf - itr.statsLock.Unlock() -} - -// Stats returns stats on the points processed. -func (itr *booleanIterator) Stats() query.IteratorStats { - itr.statsLock.Lock() - stats := itr.stats - itr.statsLock.Unlock() - return stats -} - -// Close closes the iterator. -func (itr *booleanIterator) Close() error { - cursorsAt(itr.aux).close() - itr.aux = nil - cursorsAt(itr.conds.curs).close() - itr.conds.curs = nil - if itr.cur != nil { - err := itr.cur.close() - itr.cur = nil - return err - } - return nil -} - -// booleanLimitIterator -type booleanLimitIterator struct { - input query.BooleanIterator - opt query.IteratorOptions - n int -} - -func newBooleanLimitIterator(input query.BooleanIterator, opt query.IteratorOptions) *booleanLimitIterator { - return &booleanLimitIterator{ - input: input, - opt: opt, - } -} - -func (itr *booleanLimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } -func (itr *booleanLimitIterator) Close() error { return itr.input.Close() } - -func (itr *booleanLimitIterator) Next() (*query.BooleanPoint, error) { - // Check if we are beyond the limit. - if (itr.n - itr.opt.Offset) > itr.opt.Limit { - return nil, nil - } - - // Read the next point. - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - // Increment counter. - itr.n++ - - // Offsets are handled by a higher level iterator so return all points. - return p, nil -} - -// booleanCursor represents an object for iterating over a single boolean field. -type booleanCursor interface { - cursor - nextBoolean() (t int64, v bool) -} - -func newBooleanCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) booleanCursor { - if ascending { - return newBooleanAscendingCursor(seek, cacheValues, tsmKeyCursor) - } - return newBooleanDescendingCursor(seek, cacheValues, tsmKeyCursor) -} - -type booleanAscendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []BooleanValue - pos int - keyCursor *KeyCursor - } -} - -func newBooleanAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *booleanAscendingCursor { - c := &booleanAscendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *booleanAscendingCursor) peekCache() (t int64, v bool) { - if c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, false - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(BooleanValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *booleanAscendingCursor) peekTSM() (t int64, v bool) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, false - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *booleanAscendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *booleanAscendingCursor) next() (int64, interface{}) { return c.nextBoolean() } - -// nextBoolean returns the next key/value for the cursor. -func (c *booleanAscendingCursor) nextBoolean() (int64, bool) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, false - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *booleanAscendingCursor) nextCache() { - if c.cache.pos >= len(c.cache.values) { - return - } - c.cache.pos++ -} - -// nextTSM returns the next value from the TSM files. -func (c *booleanAscendingCursor) nextTSM() { - c.tsm.pos++ - if c.tsm.pos >= len(c.tsm.values) { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = 0 - } -} - -type booleanDescendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []BooleanValue - pos int - keyCursor *KeyCursor - } -} - -func newBooleanDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *booleanDescendingCursor { - c := &booleanDescendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - if t, _ := c.peekCache(); t != seek { - c.cache.pos-- - } - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - if t, _ := c.peekTSM(); t != seek { - c.tsm.pos-- - } - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *booleanDescendingCursor) peekCache() (t int64, v bool) { - if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, false - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(BooleanValue).value -} - -// peekTSM returns the current time/value from tsm. -func (c *booleanDescendingCursor) peekTSM() (t int64, v bool) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, false - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *booleanDescendingCursor) close() error { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *booleanDescendingCursor) next() (int64, interface{}) { return c.nextBoolean() } - -// nextBoolean returns the next key/value for the cursor. -func (c *booleanDescendingCursor) nextBoolean() (int64, bool) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, false - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *booleanDescendingCursor) nextCache() { - if c.cache.pos < 0 { - return - } - c.cache.pos-- -} - -// nextTSM returns the next value from the TSM files. -func (c *booleanDescendingCursor) nextTSM() { - c.tsm.pos-- - if c.tsm.pos < 0 { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = len(c.tsm.values) - 1 - } -} - -var _ = fmt.Print diff --git a/tsdb/tsm1/iterator.gen.go.tmpl b/tsdb/tsm1/iterator.gen.go.tmpl deleted file mode 100644 index fbc73a2c75..0000000000 --- a/tsdb/tsm1/iterator.gen.go.tmpl +++ /dev/null @@ -1,574 +0,0 @@ -package tsm1 - -import ( - "sort" - "fmt" - "runtime" - "sync" - - "github.com/influxdata/platform/query" - "github.com/influxdata/platform/tsdb" - "github.com/influxdata/influxql" - "go.uber.org/zap" -) - -type cursor interface { - close() error - next() (t int64, v interface{}) -} - -// cursorAt provides a bufferred cursor interface. -// This required for literal value cursors which don't have a time value. -type cursorAt interface { - close() error - peek() (k int64, v interface{}) - nextAt(seek int64) interface{} -} - -type nilCursor struct {} -func (nilCursor) next() (int64, interface{}) { return tsdb.EOF, nil } -func (nilCursor) close() error { return nil } - -// bufCursor implements a bufferred cursor. -type bufCursor struct { - cur cursor - buf struct { - key int64 - value interface{} - filled bool - } - ascending bool -} - -// newBufCursor returns a bufferred wrapper for cur. -func newBufCursor(cur cursor, ascending bool) *bufCursor { - return &bufCursor{cur: cur, ascending: ascending} -} - -func (c *bufCursor) close() error { - if c.cur == nil { - return nil - } - - err := c.cur.close() - c.cur = nil - return err -} - -// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor. -func (c *bufCursor) next() (int64, interface{}) { - if c.buf.filled { - k, v := c.buf.key, c.buf.value - c.buf.filled = false - return k, v - } - return c.cur.next() -} - -// unread pushes k and v onto the buffer. -func (c *bufCursor) unread(k int64, v interface{}) { - c.buf.key, c.buf.value = k, v - c.buf.filled = true -} - -// peek reads next next key/value without removing them from the cursor. -func (c *bufCursor) peek() (k int64, v interface{}) { - k, v = c.next() - c.unread(k, v) - return -} - -// nextAt returns the next value where key is equal to seek. -// Skips over any keys that are less than seek. -// If the key doesn't exist then a nil value is returned instead. -func (c *bufCursor) nextAt(seek int64) interface{} { - for { - k, v := c.next() - if k != tsdb.EOF { - if k == seek { - return v - } else if c.ascending && k < seek { - continue - } else if !c.ascending && k > seek { - continue - } - c.unread(k, v) - } - - // Return "nil" value for type. - switch c.cur.(type) { - case floatCursor: - return (*float64)(nil) - case integerCursor: - return (*int64)(nil) - case unsignedCursor: - return (*uint64)(nil) - case stringCursor: - return (*string)(nil) - case booleanCursor: - return (*bool)(nil) - default: - panic("unreachable") - } - } -} - - -// statsBufferCopyIntervalN is the number of points that are read before -// copying the stats buffer to the iterator's stats field. This is used to -// amortize the cost of using a mutex when updating stats. -const statsBufferCopyIntervalN = 100 - -{{range .}} - -type {{.name}}FinalizerIterator struct { - query.{{.Name}}Iterator - logger *zap.Logger -} - -func new{{.Name}}FinalizerIterator(inner query.{{.Name}}Iterator, logger *zap.Logger) *{{.name}}FinalizerIterator { - itr := &{{.name}}FinalizerIterator{ {{.Name}}Iterator: inner, logger: logger} - runtime.SetFinalizer(itr, (*{{.name}}FinalizerIterator).closeGC) - return itr -} - -func (itr *{{.name}}FinalizerIterator) closeGC() { - go func() { - itr.logger.Error("{{.Name}}Iterator finalized by GC") - itr.Close() - }() -} - -func (itr *{{.name}}FinalizerIterator) Close() error { - runtime.SetFinalizer(itr, nil) - return itr.{{.Name}}Iterator.Close() -} - -type {{.name}}Iterator struct { - cur {{.name}}Cursor - aux []cursorAt - conds struct { - names []string - curs []cursorAt - } - opt query.IteratorOptions - - m map[string]interface{} // map used for condition evaluation - point query.{{.Name}}Point // reusable buffer - - statsLock sync.Mutex - stats query.IteratorStats - statsBuf query.IteratorStats -} - -func new{{.Name}}Iterator(name string, tags query.Tags, opt query.IteratorOptions, cur {{.name}}Cursor, aux []cursorAt, conds []cursorAt, condNames []string) *{{.name}}Iterator { - itr := &{{.name}}Iterator{ - cur: cur, - aux: aux, - opt: opt, - point: query.{{.Name}}Point{ - Name: name, - Tags: tags, - }, - statsBuf: query.IteratorStats{ - SeriesN: 1, - }, - } - itr.stats = itr.statsBuf - - if len(aux) > 0 { - itr.point.Aux = make([]interface{}, len(aux)) - } - - if opt.Condition != nil { - itr.m = make(map[string]interface{}, len(aux)+len(conds)) - } - itr.conds.names = condNames - itr.conds.curs = conds - - return itr -} - -// Next returns the next point from the iterator. -func (itr *{{.name}}Iterator) Next() (*query.{{.Name}}Point, error) { - for { - seek := tsdb.EOF - - if itr.cur != nil { - // Read from the main cursor if we have one. - itr.point.Time, itr.point.Value = itr.cur.next{{.Name}}() - seek = itr.point.Time - } else { - // Otherwise find lowest aux timestamp. - for i := range itr.aux { - if k, _ := itr.aux[i].peek(); k != tsdb.EOF { - if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) { - seek = k - } - } - } - itr.point.Time = seek - } - - // Exit if we have no more points or we are outside our time range. - if itr.point.Time == tsdb.EOF { - itr.copyStats() - return nil, nil - } else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime { - itr.copyStats() - return nil, nil - } else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime { - itr.copyStats() - return nil, nil - } - - // Read from each auxiliary cursor. - for i := range itr.opt.Aux { - itr.point.Aux[i] = itr.aux[i].nextAt(seek) - } - - // Read from condition field cursors. - for i := range itr.conds.curs { - itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek) - } - - // Evaluate condition, if one exists. Retry if it fails. - valuer := influxql.ValuerEval{ - Valuer: influxql.MultiValuer( - query.MathValuer{}, - influxql.MapValuer(itr.m), - ), - } - if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) { - continue - } - - // Track points returned. - itr.statsBuf.PointN++ - - // Copy buffer to stats periodically. - if itr.statsBuf.PointN % statsBufferCopyIntervalN == 0 { - itr.copyStats() - } - - return &itr.point, nil - } -} - -// copyStats copies from the itr stats buffer to the stats under lock. -func (itr *{{.name}}Iterator) copyStats() { - itr.statsLock.Lock() - itr.stats = itr.statsBuf - itr.statsLock.Unlock() -} - -// Stats returns stats on the points processed. -func (itr *{{.name}}Iterator) Stats() query.IteratorStats { - itr.statsLock.Lock() - stats := itr.stats - itr.statsLock.Unlock() - return stats -} - -// Close closes the iterator. -func (itr *{{.name}}Iterator) Close() error { - cursorsAt(itr.aux).close() - itr.aux = nil - cursorsAt(itr.conds.curs).close() - itr.conds.curs = nil - if itr.cur != nil { - err := itr.cur.close() - itr.cur = nil - return err - } - return nil -} - -// {{.name}}LimitIterator -type {{.name}}LimitIterator struct { - input query.{{.Name}}Iterator - opt query.IteratorOptions - n int -} - -func new{{.Name}}LimitIterator(input query.{{.Name}}Iterator, opt query.IteratorOptions) *{{.name}}LimitIterator { - return &{{.name}}LimitIterator{ - input: input, - opt: opt, - } -} - -func (itr *{{.name}}LimitIterator) Stats() query.IteratorStats { return itr.input.Stats() } -func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() } - -func (itr *{{.name}}LimitIterator) Next() (*query.{{.Name}}Point, error) { - // Check if we are beyond the limit. - if (itr.n-itr.opt.Offset) > itr.opt.Limit { - return nil, nil - } - - // Read the next point. - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - // Increment counter. - itr.n++ - - // Offsets are handled by a higher level iterator so return all points. - return p, nil -} - -// {{.name}}Cursor represents an object for iterating over a single {{.name}} field. -type {{.name}}Cursor interface { - cursor - next{{.Name}}() (t int64, v {{.Type}}) -} - -func new{{.Name}}Cursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) {{.name}}Cursor { - if ascending { - return new{{.Name}}AscendingCursor(seek, cacheValues, tsmKeyCursor) - } - return new{{.Name}}DescendingCursor(seek, cacheValues, tsmKeyCursor) -} - -type {{.name}}AscendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []{{.Name}}Value - pos int - keyCursor *KeyCursor - } -} - -func new{{.Name}}AscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *{{.name}}AscendingCursor { - c := &{{.name}}AscendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *{{.name}}AscendingCursor) peekCache() (t int64, v {{.Type}}) { - if c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, {{.Nil}} - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.({{.ValueType}}).value -} - -// peekTSM returns the current time/value from tsm. -func (c *{{.name}}AscendingCursor) peekTSM() (t int64, v {{.Type}}) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, {{.Nil}} - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *{{.name}}AscendingCursor) close() (error) { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *{{.name}}AscendingCursor) next() (int64, interface{}) { return c.next{{.Name}}() } - -// next{{.Name}} returns the next key/value for the cursor. -func (c *{{.name}}AscendingCursor) next{{.Name}}() (int64, {{.Type}}) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, {{.Nil}} - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *{{.name}}AscendingCursor) nextCache() { - if c.cache.pos >= len(c.cache.values) { - return - } - c.cache.pos++ -} - -// nextTSM returns the next value from the TSM files. -func (c *{{.name}}AscendingCursor) nextTSM() { - c.tsm.pos++ - if c.tsm.pos >= len(c.tsm.values) { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = 0 - } -} - -type {{.name}}DescendingCursor struct { - cache struct { - values Values - pos int - } - - tsm struct { - values []{{.Name}}Value - pos int - keyCursor *KeyCursor - } -} - -func new{{.Name}}DescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *{{.name}}DescendingCursor { - c := &{{.name}}DescendingCursor{} - - c.cache.values = cacheValues - c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { - return c.cache.values[i].UnixNano() >= seek - }) - if t, _ := c.peekCache(); t != seek { - c.cache.pos-- - } - - c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values) - c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool { - return c.tsm.values[i].UnixNano() >= seek - }) - if t, _ := c.peekTSM(); t != seek { - c.tsm.pos-- - } - - return c -} - -// peekCache returns the current time/value from the cache. -func (c *{{.name}}DescendingCursor) peekCache() (t int64, v {{.Type}}) { - if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) { - return tsdb.EOF, {{.Nil}} - } - - item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.({{.ValueType}}).value -} - -// peekTSM returns the current time/value from tsm. -func (c *{{.name}}DescendingCursor) peekTSM() (t int64, v {{.Type}}) { - if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) { - return tsdb.EOF, {{.Nil}} - } - - item := c.tsm.values[c.tsm.pos] - return item.UnixNano(), item.value -} - -// close closes the cursor and any dependent cursors. -func (c *{{.name}}DescendingCursor) close() (error) { - if c.tsm.keyCursor == nil { - return nil - } - - c.tsm.keyCursor.Close() - c.tsm.keyCursor = nil - c.cache.values = nil - c.tsm.values = nil - return nil -} - -// next returns the next key/value for the cursor. -func (c *{{.name}}DescendingCursor) next() (int64, interface{}) { return c.next{{.Name}}() } - -// next{{.Name}} returns the next key/value for the cursor. -func (c *{{.name}}DescendingCursor) next{{.Name}}() (int64, {{.Type}}) { - ckey, cvalue := c.peekCache() - tkey, tvalue := c.peekTSM() - - // No more data in cache or in TSM files. - if ckey == tsdb.EOF && tkey == tsdb.EOF { - return tsdb.EOF, {{.Nil}} - } - - // Both cache and tsm files have the same key, cache takes precedence. - if ckey == tkey { - c.nextCache() - c.nextTSM() - return ckey, cvalue - } - - // Buffered cache key precedes that in TSM file. - if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) { - c.nextCache() - return ckey, cvalue - } - - // Buffered TSM key precedes that in cache. - c.nextTSM() - return tkey, tvalue -} - -// nextCache returns the next value from the cache. -func (c *{{.name}}DescendingCursor) nextCache() { - if c.cache.pos < 0 { - return - } - c.cache.pos-- -} - -// nextTSM returns the next value from the TSM files. -func (c *{{.name}}DescendingCursor) nextTSM() { - c.tsm.pos-- - if c.tsm.pos < 0 { - c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values) - if len(c.tsm.values) == 0 { - return - } - c.tsm.pos = len(c.tsm.values) - 1 - } -} - -{{end}} - -var _ = fmt.Print diff --git a/tsdb/tsm1/iterator.gen.go.tmpldata b/tsdb/tsm1/iterator.gen.go.tmpldata deleted file mode 100644 index 648898fbdb..0000000000 --- a/tsdb/tsm1/iterator.gen.go.tmpldata +++ /dev/null @@ -1,42 +0,0 @@ -[ - { - "Name":"Float", - "name":"float", - "Type":"float64", - "ValueType":"FloatValue", - "Nil":"0", - "Size":"8" - }, - { - "Name":"Integer", - "name":"integer", - "Type":"int64", - "ValueType":"IntegerValue", - "Nil":"0", - "Size":"8" - }, - { - "Name":"Unsigned", - "name":"unsigned", - "Type":"uint64", - "ValueType":"UnsignedValue", - "Nil":"0", - "Size":"8" - }, - { - "Name":"String", - "name":"string", - "Type":"string", - "ValueType":"StringValue", - "Nil":"\"\"", - "Size":"0" - }, - { - "Name":"Boolean", - "name":"boolean", - "Type":"bool", - "ValueType":"BooleanValue", - "Nil":"false", - "Size":"1" - } -] diff --git a/tsdb/tsm1/iterator.go b/tsdb/tsm1/iterator.go deleted file mode 100644 index 8f4e04d298..0000000000 --- a/tsdb/tsm1/iterator.go +++ /dev/null @@ -1,53 +0,0 @@ -package tsm1 - -import ( - "fmt" - - "github.com/influxdata/platform/query" - "github.com/influxdata/platform/tsdb" - "go.uber.org/zap" -) - -// literalValueCursor represents a cursor that always returns a single value. -// It doesn't not have a time value so it can only be used with nextAt(). -type literalValueCursor struct { - value interface{} -} - -func (c *literalValueCursor) close() error { return nil } -func (c *literalValueCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } -func (c *literalValueCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } -func (c *literalValueCursor) nextAt(seek int64) interface{} { return c.value } - -type cursorsAt []cursorAt - -func (c cursorsAt) close() { - for _, cur := range c { - cur.close() - } -} - -// newFinalizerIterator creates a new iterator that installs a runtime finalizer -// to ensure close is eventually called if the iterator is garbage collected. -// This additional guard attempts to protect against clients of CreateIterator not -// correctly closing them and leaking cursors. -func newFinalizerIterator(itr query.Iterator, log *zap.Logger) query.Iterator { - if itr == nil { - return nil - } - - switch inner := itr.(type) { - case query.FloatIterator: - return newFloatFinalizerIterator(inner, log) - case query.IntegerIterator: - return newIntegerFinalizerIterator(inner, log) - case query.UnsignedIterator: - return newUnsignedFinalizerIterator(inner, log) - case query.StringIterator: - return newStringFinalizerIterator(inner, log) - case query.BooleanIterator: - return newBooleanFinalizerIterator(inner, log) - default: - panic(fmt.Sprintf("unsupported finalizer iterator type: %T", itr)) - } -} diff --git a/tsdb/tsm1/iterator_test.go b/tsdb/tsm1/iterator_test.go deleted file mode 100644 index b4b0d256b2..0000000000 --- a/tsdb/tsm1/iterator_test.go +++ /dev/null @@ -1,161 +0,0 @@ -package tsm1 - -import ( - "os" - "runtime" - "testing" - "time" - - "github.com/influxdata/influxql" - "github.com/influxdata/platform/logger" - "github.com/influxdata/platform/query" -) - -func BenchmarkIntegerIterator_Next(b *testing.B) { - opt := query.IteratorOptions{ - Aux: []influxql.VarRef{{Val: "f1"}, {Val: "f1"}, {Val: "f1"}, {Val: "f1"}}, - } - aux := []cursorAt{ - &literalValueCursor{value: "foo bar"}, - &literalValueCursor{value: int64(1e3)}, - &literalValueCursor{value: float64(1e3)}, - &literalValueCursor{value: true}, - } - - cur := newIntegerIterator("m0", query.Tags{}, opt, &infiniteIntegerCursor{}, aux, nil, nil) - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - cur.Next() - } -} - -type infiniteIntegerCursor struct{} - -func (*infiniteIntegerCursor) close() error { - return nil -} - -func (*infiniteIntegerCursor) next() (t int64, v interface{}) { - return 0, 0 -} - -func (*infiniteIntegerCursor) nextInteger() (t int64, v int64) { - return 0, 0 -} - -type testFinalizerIterator struct { - OnClose func() -} - -func (itr *testFinalizerIterator) Next() (*query.FloatPoint, error) { - return nil, nil -} - -func (itr *testFinalizerIterator) Close() error { - // Act as if this is a slow finalizer and ensure that it doesn't block - // the finalizer background thread. - itr.OnClose() - return nil -} - -func (itr *testFinalizerIterator) Stats() query.IteratorStats { - return query.IteratorStats{} -} - -func TestFinalizerIterator(t *testing.T) { - var ( - step1 = make(chan struct{}) - step2 = make(chan struct{}) - step3 = make(chan struct{}) - ) - - l := logger.New(os.Stderr) - done := make(chan struct{}) - func() { - itr := &testFinalizerIterator{ - OnClose: func() { - // Simulate a slow closing iterator by waiting for the done channel - // to be closed. The done channel is closed by a later finalizer. - close(step1) - <-done - close(step3) - }, - } - newFinalizerIterator(itr, l) - }() - - for i := 0; i < 100; i++ { - runtime.GC() - } - - timer := time.NewTimer(100 * time.Millisecond) - select { - case <-timer.C: - t.Fatal("The finalizer for the iterator did not run") - close(done) - case <-step1: - // The finalizer has successfully run, but should not have completed yet. - timer.Stop() - } - - select { - case <-step3: - t.Fatal("The finalizer should not have finished yet") - default: - } - - // Use a fake value that will be collected by the garbage collector and have - // the finalizer close the channel. This finalizer should run after the iterator's - // finalizer. - value := func() int { - foo := &struct { - value int - }{value: 1} - runtime.SetFinalizer(foo, func(value interface{}) { - close(done) - close(step2) - }) - return foo.value + 2 - }() - if value < 2 { - t.Log("This should never be output") - } - - for i := 0; i < 100; i++ { - runtime.GC() - } - - timer.Reset(100 * time.Millisecond) - select { - case <-timer.C: - t.Fatal("The second finalizer did not run") - case <-step2: - // The finalizer has successfully run and should have - // closed the done channel. - timer.Stop() - } - - // Wait for step3 to finish where the closed value should be set. - timer.Reset(100 * time.Millisecond) - select { - case <-timer.C: - t.Fatal("The iterator was not finalized") - case <-step3: - timer.Stop() - } -} - -func TestBufCursor_DoubleClose(t *testing.T) { - c := newBufCursor(nilCursor{}, true) - if err := c.close(); err != nil { - t.Fatalf("error closing: %v", err) - } - - // This shouldn't panic - if err := c.close(); err != nil { - t.Fatalf("error closing: %v", err) - } - -}