diff --git a/tsdb/engine/bz1/bz1.go b/tsdb/engine/bz1/bz1.go index 25d528d16e..47fa51034d 100644 --- a/tsdb/engine/bz1/bz1.go +++ b/tsdb/engine/bz1/bz1.go @@ -62,6 +62,7 @@ type WAL interface { Cursor(key string) tsdb.Cursor Open() error Close() error + Flush() error } // NewEngine returns a new instance of Engine. @@ -613,7 +614,17 @@ type Cursor struct { // Seek moves the cursor to a position and returns the closest key/value pair. func (c *Cursor) Seek(seek []byte) (key, value []byte) { // Move cursor to appropriate block and set to buffer. - _, v := c.cursor.Seek(seek) + k, v := c.cursor.Seek(seek) + if v == nil { // get the last block, it might have this time + _, v = c.cursor.Last() + } else if bytes.Compare(seek, k) == -1 { // the seek key is less than this block, go back one and check + _, v = c.cursor.Prev() + + // if the previous block max time is less than the seek value, reset to where we were originally + if v == nil || bytes.Compare(seek, v[0:8]) > 0 { + _, v = c.cursor.Seek(seek) + } + } c.setBuf(v) // Read current block up to seek position. diff --git a/tsdb/engine/bz1/bz1_test.go b/tsdb/engine/bz1/bz1_test.go index 44bd1a6977..aeebadbac0 100644 --- a/tsdb/engine/bz1/bz1_test.go +++ b/tsdb/engine/bz1/bz1_test.go @@ -240,6 +240,41 @@ func TestEngine_WriteIndex_Insert(t *testing.T) { } } +// Ensure that the engine properly seeks to a block when the seek value is in the middle. +func TestEngine_WriteIndex_SeekAgainstInBlockValue(t *testing.T) { + e := OpenDefaultEngine() + defer e.Close() + + // make sure we have data split across two blocks + dataSize := (bz1.DefaultBlockSize - 16) / 2 + data := make([]byte, dataSize, dataSize) + // Write initial points to index. + if err := e.WriteIndex(map[string][][]byte{ + "cpu": [][]byte{ + append(u64tob(10), data...), + append(u64tob(20), data...), + append(u64tob(30), data...), + append(u64tob(40), data...), + }, + }, nil, nil); err != nil { + t.Fatal(err) + } + + // Start transaction. + tx := e.MustBegin(false) + defer tx.Rollback() + + // Ensure that we can seek to a block in the middle + c := tx.Cursor("cpu") + if k, _ := c.Seek(u64tob(15)); btou64(k) != 20 { + t.Fatalf("expected to seek to time 20, but got %d", btou64(k)) + } + // Ensure that we can seek to the block on the end + if k, _ := c.Seek(u64tob(35)); btou64(k) != 40 { + t.Fatalf("expected to seek to time 40, but got %d", btou64(k)) + } +} + // Ensure the engine ignores writes without keys. func TestEngine_WriteIndex_NoKeys(t *testing.T) { e := OpenDefaultEngine() @@ -479,6 +514,8 @@ func (w *EnginePointsWriter) Close() error { return nil } func (w *EnginePointsWriter) Cursor(key string) tsdb.Cursor { return &Cursor{} } +func (w *EnginePointsWriter) Flush() error { return nil } + // Cursor represents a mock that implements tsdb.Curosr. type Cursor struct { }