diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 9257cfb8ef..c137ab5554 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -5,6 +5,7 @@ import ( "io" "log" "os" + "sort" "sync" "time" @@ -205,3 +206,123 @@ func (e *DevEngine) replaceFiles(tsm, segments []string) { os.RemoveAll(f) } } + +type devTx struct { + engine *DevEngine +} + +// Cursor returns a cursor for all cached and TSM-based data. +func (t *devTx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor { + return &devCursor{ + cache: t.engine.Cache.Values(SeriesFieldKey(series, fields[0])), + ascending: ascending, + } +} + +// devCursor is a cursor that combines both TSM and cached data. +type devCursor struct { + cache Values + position int + cacheKeyBuf int64 + cacheValueBuf interface{} + + tsmKeyBuf int64 + tsmValueBuf interface{} + + ascending bool +} + +// SeekTo positions the cursor at the timestamp specified by seek and returns the +// timestamp and value. +func (c *devCursor) SeekTo(seek int64) (int64, interface{}) { + // Seek to position in cache index. + c.position = sort.Search(len(c.cache), func(i int) bool { + return c.cache[i].Time().UnixNano() >= seek + }) + + if len(c.cache) == 0 { + c.cacheKeyBuf = tsdb.EOF + } + + if c.ascending { + if c.position < len(c.cache) { + c.cacheKeyBuf = c.cache[c.position].Time().UnixNano() + c.cacheValueBuf = c.cache[c.position].Value() + } else { + c.cacheKeyBuf = tsdb.EOF + } + } else { + + } + + // TODO: Get the first block from tsm files for the given 'seek' + // Seek to position to tsm block. + c.tsmKeyBuf = tsdb.EOF + + return c.read() +} + +// Next returns the next value from the cursor. +func (c *devCursor) Next() (int64, interface{}) { + return c.read() +} + +// Ascending returns whether the cursor returns data in time-ascending order. +func (c *devCursor) Ascending() bool { return c.ascending } + +// read returns the next value for the cursor. +func (c *devCursor) read() (int64, interface{}) { + var key int64 + var value interface{} + + // Determine where the next datum should come from -- the cache or the TSM files. + + switch { + // No more data in cache or in TSM files. + case c.cacheKeyBuf == tsdb.EOF && c.tsmKeyBuf == tsdb.EOF: + key = tsdb.EOF + + // Both cache and tsm files have the same key, cache takes precedence. + case c.cacheKeyBuf == c.tsmKeyBuf: + key = c.cacheKeyBuf + value = c.cacheValueBuf + c.cacheKeyBuf, c.cacheValueBuf = c.nextCache() + c.tsmKeyBuf, c.tsmValueBuf = c.nextTSM() + + // Buffered cache key precedes that in TSM file. + case c.ascending && (c.cacheKeyBuf != tsdb.EOF && (c.cacheKeyBuf < c.tsmKeyBuf || c.tsmKeyBuf == tsdb.EOF)), + !c.ascending && (c.cacheKeyBuf != tsdb.EOF && (c.cacheKeyBuf > c.tsmKeyBuf || c.tsmKeyBuf == tsdb.EOF)): + key = c.cacheKeyBuf + value = c.cacheValueBuf + c.cacheKeyBuf, c.cacheValueBuf = c.nextCache() + + // Buffered TSM key precedes that in cache. + default: + key = c.tsmKeyBuf + value = c.tsmValueBuf + c.tsmKeyBuf, c.tsmValueBuf = c.nextTSM() + } + + return key, value +} + +// nextCache returns the next value from the cache. +func (c *devCursor) nextCache() (int64, interface{}) { + if c.ascending { + c.position++ + if c.position >= len(c.cache) { + return tsdb.EOF, nil + } + } else { + c.position-- + if c.position < 0 { + return tsdb.EOF, nil + } + } + return c.cache[c.position].UnixNano(), c.cache[c.position].Value() +} + +// nextTSM returns the next value from the TSM files. +func (c *devCursor) nextTSM() (int64, interface{}) { + return tsdb.EOF, nil +}