Updates based on PR feedback

pull/4308/head
Paul Dix 2015-10-05 19:57:49 -04:00
parent 26a93ec23e
commit 267f34b94e
4 changed files with 42 additions and 8 deletions

View File

@ -1146,7 +1146,7 @@ func (t Tags) HashKey() []byte {
ek := escapeTag([]byte(k))
ev := escapeTag([]byte(v))
if len(string(ev)) > 0 {
if len(ev) > 0 {
escaped[string(ek)] = string(ev)
}
}

View File

@ -71,7 +71,7 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro
return newEngineFuncs[options.EngineVersion](path, walPath, options), nil
}
// Only bolt-based backends are currently supported so open it and check the format.
// Only bolt and tsm1 based storage engines are currently supported
var format string
if err := func() error {
// if it's a dir then it's a tsm1 engine

View File

@ -6,6 +6,10 @@ import (
"github.com/influxdb/influxdb/tsdb"
)
// combinedEngineCursor holds a cursor for the WAL and the index
// and will combine the two together. Any points in the WAL with
// identical timestamps from the index will be preferred over the
// index point
type combinedEngineCursor struct {
walCursor tsdb.Cursor
engineCursor tsdb.Cursor
@ -24,20 +28,26 @@ func NewCombinedEngineCursor(wc, ec tsdb.Cursor, ascending bool) tsdb.Cursor {
}
}
// SeekTo will seek both the index and WAL cursor
func (c *combinedEngineCursor) SeekTo(seek int64) (key int64, value interface{}) {
c.walKeyBuf, c.walValueBuf = c.walCursor.SeekTo(seek)
c.engineKeyBuf, c.engineValueBuf = c.engineCursor.SeekTo(seek)
return c.read()
}
// Next returns the next value in the cursor
func (c *combinedEngineCursor) Next() (int64, interface{}) {
return c.read()
}
// Ascending returns true if the cursor is time ascending
func (c *combinedEngineCursor) Ascending() bool {
return c.ascending
}
// read will return the buffer value that is next from either the
// WAL or index cursor and repopulate the buffer value with the
// appropriate cursor's next value
func (c *combinedEngineCursor) read() (key int64, value interface{}) {
if c.walKeyBuf == tsdb.EOF && c.engineKeyBuf == tsdb.EOF {
return tsdb.EOF, nil
@ -84,6 +94,9 @@ func (c *combinedEngineCursor) read() (key int64, value interface{}) {
return
}
// multieFieldCursor wraps cursors for multiple fields on the same series
// key. Instead of returning a plain interface value in the call for Next(),
// it returns a map[string]interface{} for the field values
type multiFieldCursor struct {
fields []string
cursors []tsdb.Cursor
@ -158,18 +171,31 @@ func (c *emptyCursor) Next() (int64, interface{}) { return tsdb.EOF,
func (c *emptyCursor) SeekTo(key int64) (int64, interface{}) { return tsdb.EOF, nil }
func (c *emptyCursor) Ascending() bool { return c.ascending }
// cursor is a cursor for the data in the index
type cursor struct {
id uint64
f *dataFile
// id for the series key and field
id uint64
// f is the current data file we're reading from
f *dataFile
// filesPos is the position in the files index we're reading from
filesPos int // the index in the files slice we're looking at
pos uint32
vals Values
// pos is the position in the current data file we're reading
pos uint32
// vals is the current decoded block of Values we're iterating from
vals Values
ascending bool
blockPositions []uint32 // only used for descending queries
// blockPositions is used for descending queries to keep track
// of what positions in the current data file encoded blocks for
// the id exist at
blockPositions []uint32
// time acending list of data files
// time acending slice of read only data files
files []*dataFile
}
@ -334,10 +360,13 @@ func (c *cursor) seekDescending(seek int64) (int64, interface{}) {
return tsdb.EOF, nil
}
// blockMinTime is the minimum time for the block
func (c *cursor) blockMinTime(pos uint32) int64 {
return int64(btou64(c.f.mmap[pos+12 : pos+20]))
}
// setBlockPositions will read the positions of all
// blocks for the cursor id in the given data file
func (c *cursor) setBlockPositions() {
pos := c.pos
@ -431,6 +460,7 @@ func (c *cursor) blockLength(pos uint32) uint32 {
return btou32(c.f.mmap[pos+8 : pos+12])
}
// decodeBlock will decod the block and set the vals
func (c *cursor) decodeBlock(position uint32) {
length := c.blockLength(position)
block := c.f.mmap[position+blockHeaderSize : position+blockHeaderSize+length]

View File

@ -80,6 +80,8 @@ const (
// MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall
MAP_POPULATE = 0x8000
// magicNumber is written as the first 4 bytes of a data file to
// identify the file as a tsm1 formatted file
magicNumber uint32 = 0x16D116D1
)
@ -1298,6 +1300,8 @@ func (e *Engine) writeNewFileExcludeDeletes(oldDF *dataFile) *dataFile {
}
func (e *Engine) nextFileName() string {
e.filesLock.Lock()
defer e.filesLock.Unlock()
e.currentFileID++
return filepath.Join(e.path, fmt.Sprintf("%07d.%s", e.currentFileID, Format))
}