Integrate cache query with tsm1dev engine

pull/4932/head
Philip O'Toole 2015-11-30 14:18:19 -08:00
parent 7da3fc1aeb
commit 59674fda21
1 changed files with 121 additions and 0 deletions

View File

@ -5,6 +5,7 @@ import (
"io"
"log"
"os"
"sort"
"sync"
"time"
@ -205,3 +206,123 @@ func (e *DevEngine) replaceFiles(tsm, segments []string) {
os.RemoveAll(f)
}
}
type devTx struct {
engine *DevEngine
}
// Cursor returns a cursor for all cached and TSM-based data.
func (t *devTx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
return &devCursor{
cache: t.engine.Cache.Values(SeriesFieldKey(series, fields[0])),
ascending: ascending,
}
}
// devCursor is a cursor that combines both TSM and cached data.
type devCursor struct {
cache Values
position int
cacheKeyBuf int64
cacheValueBuf interface{}
tsmKeyBuf int64
tsmValueBuf interface{}
ascending bool
}
// SeekTo positions the cursor at the timestamp specified by seek and returns the
// timestamp and value.
func (c *devCursor) SeekTo(seek int64) (int64, interface{}) {
// Seek to position in cache index.
c.position = sort.Search(len(c.cache), func(i int) bool {
return c.cache[i].Time().UnixNano() >= seek
})
if len(c.cache) == 0 {
c.cacheKeyBuf = tsdb.EOF
}
if c.ascending {
if c.position < len(c.cache) {
c.cacheKeyBuf = c.cache[c.position].Time().UnixNano()
c.cacheValueBuf = c.cache[c.position].Value()
} else {
c.cacheKeyBuf = tsdb.EOF
}
} else {
}
// TODO: Get the first block from tsm files for the given 'seek'
// Seek to position to tsm block.
c.tsmKeyBuf = tsdb.EOF
return c.read()
}
// Next returns the next value from the cursor.
func (c *devCursor) Next() (int64, interface{}) {
return c.read()
}
// Ascending returns whether the cursor returns data in time-ascending order.
func (c *devCursor) Ascending() bool { return c.ascending }
// read returns the next value for the cursor.
func (c *devCursor) read() (int64, interface{}) {
var key int64
var value interface{}
// Determine where the next datum should come from -- the cache or the TSM files.
switch {
// No more data in cache or in TSM files.
case c.cacheKeyBuf == tsdb.EOF && c.tsmKeyBuf == tsdb.EOF:
key = tsdb.EOF
// Both cache and tsm files have the same key, cache takes precedence.
case c.cacheKeyBuf == c.tsmKeyBuf:
key = c.cacheKeyBuf
value = c.cacheValueBuf
c.cacheKeyBuf, c.cacheValueBuf = c.nextCache()
c.tsmKeyBuf, c.tsmValueBuf = c.nextTSM()
// Buffered cache key precedes that in TSM file.
case c.ascending && (c.cacheKeyBuf != tsdb.EOF && (c.cacheKeyBuf < c.tsmKeyBuf || c.tsmKeyBuf == tsdb.EOF)),
!c.ascending && (c.cacheKeyBuf != tsdb.EOF && (c.cacheKeyBuf > c.tsmKeyBuf || c.tsmKeyBuf == tsdb.EOF)):
key = c.cacheKeyBuf
value = c.cacheValueBuf
c.cacheKeyBuf, c.cacheValueBuf = c.nextCache()
// Buffered TSM key precedes that in cache.
default:
key = c.tsmKeyBuf
value = c.tsmValueBuf
c.tsmKeyBuf, c.tsmValueBuf = c.nextTSM()
}
return key, value
}
// nextCache returns the next value from the cache.
func (c *devCursor) nextCache() (int64, interface{}) {
if c.ascending {
c.position++
if c.position >= len(c.cache) {
return tsdb.EOF, nil
}
} else {
c.position--
if c.position < 0 {
return tsdb.EOF, nil
}
}
return c.cache[c.position].UnixNano(), c.cache[c.position].Value()
}
// nextTSM returns the next value from the TSM files.
func (c *devCursor) nextTSM() (int64, interface{}) {
return tsdb.EOF, nil
}