From 2d79d7e35f69e9d44560d0511a1789b584da1092 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 3 Dec 2015 14:08:22 -0800 Subject: [PATCH] Fix descending cache cursor --- tsdb/engine/tsm1/engine.go | 56 ++++++++++++++++++++++----------- tsdb/engine/tsm1/engine_test.go | 1 - 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 9015ea4d61..4b97f197a7 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -434,26 +434,36 @@ type devCursor struct { // SeekTo positions the cursor at the timestamp specified by seek and returns the // timestamp and value. func (c *devCursor) SeekTo(seek int64) (int64, interface{}) { - // Seek to position in cache index. - c.cachePos = sort.Search(len(c.cache), func(i int) bool { - if c.ascending { + // Seek to position in cache. + c.cacheKeyBuf, c.cacheValueBuf = func() (int64, interface{}) { + // Seek to position in cache index. + c.cachePos = sort.Search(len(c.cache), func(i int) bool { return c.cache[i].Time().UnixNano() >= seek + }) + + if c.cachePos < len(c.cache) { + v := c.cache[c.cachePos] + if v.UnixNano() == seek || c.ascending { + // Exact seek found or, if ascending, next one is good. + return v.UnixNano(), v.Value() + } + // Nothing available if descending. + return tsdb.EOF, nil } - return c.cache[i].Time().UnixNano() <= seek - }) - if len(c.cache) == 0 { - c.cacheKeyBuf = tsdb.EOF - } + // Ascending cursor, no match in the cache. + if c.ascending { + return tsdb.EOF, nil + } - if c.cachePos < len(c.cache) { - c.cacheKeyBuf = c.cache[c.cachePos].Time().UnixNano() - c.cacheValueBuf = c.cache[c.cachePos].Value() - } else { - c.cacheKeyBuf = tsdb.EOF - } + // Descending cursor, go to previous value in cache, and return if it exists. + c.cachePos-- + if c.cachePos < 0 { + return tsdb.EOF, nil + } + return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value() + }() - // TODO: Get the first block from tsm files for the given 'seek' // Seek to position to tsm block. if c.ascending { c.tsmValues, _ = c.tsm.Scan(SeriesFieldKey(c.series, c.fields[0]), time.Unix(0, seek-1), c.ascending) @@ -525,11 +535,19 @@ func (c *devCursor) read() (int64, interface{}) { // nextCache returns the next value from the cache. func (c *devCursor) nextCache() (int64, interface{}) { - c.cachePos++ - if c.cachePos >= len(c.cache) { - return tsdb.EOF, nil + if c.ascending { + c.cachePos++ + if c.cachePos >= len(c.cache) { + return tsdb.EOF, nil + } + return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value() + } else { + c.cachePos-- + if c.cachePos < 0 { + return tsdb.EOF, nil + } + return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value() } - return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value() } // nextTSM returns the next value from the TSM files. diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 63460fba80..1c14e9f0f7 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -131,7 +131,6 @@ func TestDevEngine_QueryTSM_Ascending(t *testing.T) { // Ensure an engine containing cached values responds correctly to queries. func TestDevEngine_QueryCache_Descending(t *testing.T) { - t.Skip("fixme") // Generate temporary file. f, _ := ioutil.TempFile("", "tsm1dev") f.Close()