From 267f34b94e8d4ab6987f0aace2a6954e83133f14 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 5 Oct 2015 19:57:49 -0400 Subject: [PATCH] Updates based on PR feedback --- models/points.go | 2 +- tsdb/engine.go | 2 +- tsdb/engine/tsm1/cursor.go | 42 ++++++++++++++++++++++++++++++++------ tsdb/engine/tsm1/tsm1.go | 4 ++++ 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/models/points.go b/models/points.go index d91ff13afb..592780389d 100644 --- a/models/points.go +++ b/models/points.go @@ -1146,7 +1146,7 @@ func (t Tags) HashKey() []byte { ek := escapeTag([]byte(k)) ev := escapeTag([]byte(v)) - if len(string(ev)) > 0 { + if len(ev) > 0 { escaped[string(ek)] = string(ev) } } diff --git a/tsdb/engine.go b/tsdb/engine.go index 97ca51d9dd..fb1b2108c5 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -71,7 +71,7 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro return newEngineFuncs[options.EngineVersion](path, walPath, options), nil } - // Only bolt-based backends are currently supported so open it and check the format. + // Only bolt and tsm1 based storage engines are currently supported var format string if err := func() error { // if it's a dir then it's a tsm1 engine diff --git a/tsdb/engine/tsm1/cursor.go b/tsdb/engine/tsm1/cursor.go index 06fd0bbf8f..01ac2e3c7a 100644 --- a/tsdb/engine/tsm1/cursor.go +++ b/tsdb/engine/tsm1/cursor.go @@ -6,6 +6,10 @@ import ( "github.com/influxdb/influxdb/tsdb" ) +// combinedEngineCursor holds a cursor for the WAL and the index +// and will combine the two together. Any points in the WAL with +// identical timestamps from the index will be preferred over the +// index point type combinedEngineCursor struct { walCursor tsdb.Cursor engineCursor tsdb.Cursor @@ -24,20 +28,26 @@ func NewCombinedEngineCursor(wc, ec tsdb.Cursor, ascending bool) tsdb.Cursor { } } +// SeekTo will seek both the index and WAL cursor func (c *combinedEngineCursor) SeekTo(seek int64) (key int64, value interface{}) { c.walKeyBuf, c.walValueBuf = c.walCursor.SeekTo(seek) c.engineKeyBuf, c.engineValueBuf = c.engineCursor.SeekTo(seek) return c.read() } +// Next returns the next value in the cursor func (c *combinedEngineCursor) Next() (int64, interface{}) { return c.read() } +// Ascending returns true if the cursor is time ascending func (c *combinedEngineCursor) Ascending() bool { return c.ascending } +// read will return the buffer value that is next from either the +// WAL or index cursor and repopulate the buffer value with the +// appropriate cursor's next value func (c *combinedEngineCursor) read() (key int64, value interface{}) { if c.walKeyBuf == tsdb.EOF && c.engineKeyBuf == tsdb.EOF { return tsdb.EOF, nil @@ -84,6 +94,9 @@ func (c *combinedEngineCursor) read() (key int64, value interface{}) { return } +// multieFieldCursor wraps cursors for multiple fields on the same series +// key. Instead of returning a plain interface value in the call for Next(), +// it returns a map[string]interface{} for the field values type multiFieldCursor struct { fields []string cursors []tsdb.Cursor @@ -158,18 +171,31 @@ func (c *emptyCursor) Next() (int64, interface{}) { return tsdb.EOF, func (c *emptyCursor) SeekTo(key int64) (int64, interface{}) { return tsdb.EOF, nil } func (c *emptyCursor) Ascending() bool { return c.ascending } +// cursor is a cursor for the data in the index type cursor struct { - id uint64 - f *dataFile + // id for the series key and field + id uint64 + + // f is the current data file we're reading from + f *dataFile + + // filesPos is the position in the files index we're reading from filesPos int // the index in the files slice we're looking at - pos uint32 - vals Values + + // pos is the position in the current data file we're reading + pos uint32 + + // vals is the current decoded block of Values we're iterating from + vals Values ascending bool - blockPositions []uint32 // only used for descending queries + // blockPositions is used for descending queries to keep track + // of what positions in the current data file encoded blocks for + // the id exist at + blockPositions []uint32 - // time acending list of data files + // time acending slice of read only data files files []*dataFile } @@ -334,10 +360,13 @@ func (c *cursor) seekDescending(seek int64) (int64, interface{}) { return tsdb.EOF, nil } +// blockMinTime is the minimum time for the block func (c *cursor) blockMinTime(pos uint32) int64 { return int64(btou64(c.f.mmap[pos+12 : pos+20])) } +// setBlockPositions will read the positions of all +// blocks for the cursor id in the given data file func (c *cursor) setBlockPositions() { pos := c.pos @@ -431,6 +460,7 @@ func (c *cursor) blockLength(pos uint32) uint32 { return btou32(c.f.mmap[pos+8 : pos+12]) } +// decodeBlock will decod the block and set the vals func (c *cursor) decodeBlock(position uint32) { length := c.blockLength(position) block := c.f.mmap[position+blockHeaderSize : position+blockHeaderSize+length] diff --git a/tsdb/engine/tsm1/tsm1.go b/tsdb/engine/tsm1/tsm1.go index aac359c19d..59a0f3fe05 100644 --- a/tsdb/engine/tsm1/tsm1.go +++ b/tsdb/engine/tsm1/tsm1.go @@ -80,6 +80,8 @@ const ( // MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall MAP_POPULATE = 0x8000 + // magicNumber is written as the first 4 bytes of a data file to + // identify the file as a tsm1 formatted file magicNumber uint32 = 0x16D116D1 ) @@ -1298,6 +1300,8 @@ func (e *Engine) writeNewFileExcludeDeletes(oldDF *dataFile) *dataFile { } func (e *Engine) nextFileName() string { + e.filesLock.Lock() + defer e.filesLock.Unlock() e.currentFileID++ return filepath.Join(e.path, fmt.Sprintf("%07d.%s", e.currentFileID, Format)) }