Fix descending cache cursor
parent
b73b21f062
commit
2d79d7e35f
|
@ -434,26 +434,36 @@ type devCursor struct {
|
|||
// SeekTo positions the cursor at the timestamp specified by seek and returns the
|
||||
// timestamp and value.
|
||||
func (c *devCursor) SeekTo(seek int64) (int64, interface{}) {
|
||||
// Seek to position in cache index.
|
||||
c.cachePos = sort.Search(len(c.cache), func(i int) bool {
|
||||
if c.ascending {
|
||||
// Seek to position in cache.
|
||||
c.cacheKeyBuf, c.cacheValueBuf = func() (int64, interface{}) {
|
||||
// Seek to position in cache index.
|
||||
c.cachePos = sort.Search(len(c.cache), func(i int) bool {
|
||||
return c.cache[i].Time().UnixNano() >= seek
|
||||
})
|
||||
|
||||
if c.cachePos < len(c.cache) {
|
||||
v := c.cache[c.cachePos]
|
||||
if v.UnixNano() == seek || c.ascending {
|
||||
// Exact seek found or, if ascending, next one is good.
|
||||
return v.UnixNano(), v.Value()
|
||||
}
|
||||
// Nothing available if descending.
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
return c.cache[i].Time().UnixNano() <= seek
|
||||
})
|
||||
|
||||
if len(c.cache) == 0 {
|
||||
c.cacheKeyBuf = tsdb.EOF
|
||||
}
|
||||
// Ascending cursor, no match in the cache.
|
||||
if c.ascending {
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
|
||||
if c.cachePos < len(c.cache) {
|
||||
c.cacheKeyBuf = c.cache[c.cachePos].Time().UnixNano()
|
||||
c.cacheValueBuf = c.cache[c.cachePos].Value()
|
||||
} else {
|
||||
c.cacheKeyBuf = tsdb.EOF
|
||||
}
|
||||
// Descending cursor, go to previous value in cache, and return if it exists.
|
||||
c.cachePos--
|
||||
if c.cachePos < 0 {
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value()
|
||||
}()
|
||||
|
||||
// TODO: Get the first block from tsm files for the given 'seek'
|
||||
// Seek to position to tsm block.
|
||||
if c.ascending {
|
||||
c.tsmValues, _ = c.tsm.Scan(SeriesFieldKey(c.series, c.fields[0]), time.Unix(0, seek-1), c.ascending)
|
||||
|
@ -525,11 +535,19 @@ func (c *devCursor) read() (int64, interface{}) {
|
|||
|
||||
// nextCache returns the next value from the cache.
|
||||
func (c *devCursor) nextCache() (int64, interface{}) {
|
||||
c.cachePos++
|
||||
if c.cachePos >= len(c.cache) {
|
||||
return tsdb.EOF, nil
|
||||
if c.ascending {
|
||||
c.cachePos++
|
||||
if c.cachePos >= len(c.cache) {
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value()
|
||||
} else {
|
||||
c.cachePos--
|
||||
if c.cachePos < 0 {
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value()
|
||||
}
|
||||
return c.cache[c.cachePos].UnixNano(), c.cache[c.cachePos].Value()
|
||||
}
|
||||
|
||||
// nextTSM returns the next value from the TSM files.
|
||||
|
|
|
@ -131,7 +131,6 @@ func TestDevEngine_QueryTSM_Ascending(t *testing.T) {
|
|||
|
||||
// Ensure an engine containing cached values responds correctly to queries.
|
||||
func TestDevEngine_QueryCache_Descending(t *testing.T) {
|
||||
t.Skip("fixme")
|
||||
// Generate temporary file.
|
||||
f, _ := ioutil.TempFile("", "tsm1dev")
|
||||
f.Close()
|
||||
|
|
Loading…
Reference in New Issue