Merge pull request #3829 from influxdb/pd-fix-missing-data-after-flush

Fix missing data in aggregates with bz1
pull/3836/head
Paul Dix 2015-08-25 16:27:03 -04:00
commit a4735624f8
2 changed files with 49 additions and 1 deletions

View File

@ -62,6 +62,7 @@ type WAL interface {
Cursor(key string) tsdb.Cursor
Open() error
Close() error
Flush() error
}
// NewEngine returns a new instance of Engine.
@ -613,7 +614,17 @@ type Cursor struct {
// Seek moves the cursor to a position and returns the closest key/value pair.
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
// Move cursor to appropriate block and set to buffer.
_, v := c.cursor.Seek(seek)
k, v := c.cursor.Seek(seek)
if v == nil { // get the last block, it might have this time
_, v = c.cursor.Last()
} else if bytes.Compare(seek, k) == -1 { // the seek key is less than this block, go back one and check
_, v = c.cursor.Prev()
// if the previous block max time is less than the seek value, reset to where we were originally
if v == nil || bytes.Compare(seek, v[0:8]) > 0 {
_, v = c.cursor.Seek(seek)
}
}
c.setBuf(v)
// Read current block up to seek position.

View File

@ -240,6 +240,41 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
}
}
// Ensure that the engine properly seeks to a block when the seek value is in the middle.
func TestEngine_WriteIndex_SeekAgainstInBlockValue(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// make sure we have data split across two blocks
dataSize := (bz1.DefaultBlockSize - 16) / 2
data := make([]byte, dataSize, dataSize)
// Write initial points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(10), data...),
append(u64tob(20), data...),
append(u64tob(30), data...),
append(u64tob(40), data...),
},
}, nil, nil); err != nil {
t.Fatal(err)
}
// Start transaction.
tx := e.MustBegin(false)
defer tx.Rollback()
// Ensure that we can seek to a block in the middle
c := tx.Cursor("cpu")
if k, _ := c.Seek(u64tob(15)); btou64(k) != 20 {
t.Fatalf("expected to seek to time 20, but got %d", btou64(k))
}
// Ensure that we can seek to the block on the end
if k, _ := c.Seek(u64tob(35)); btou64(k) != 40 {
t.Fatalf("expected to seek to time 40, but got %d", btou64(k))
}
}
// Ensure the engine ignores writes without keys.
func TestEngine_WriteIndex_NoKeys(t *testing.T) {
e := OpenDefaultEngine()
@ -479,6 +514,8 @@ func (w *EnginePointsWriter) Close() error { return nil }
func (w *EnginePointsWriter) Cursor(key string) tsdb.Cursor { return &Cursor{} }
func (w *EnginePointsWriter) Flush() error { return nil }
// Cursor represents a mock that implements tsdb.Curosr.
type Cursor struct {
}