From 02dbe6dbd3356a89a01bdd32e02cd9b28f511687 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 16 Nov 2017 15:23:34 -0700 Subject: [PATCH] Fix KeyCursor not return remaing blocks If the first block that needs to be read was partially deleted such that the trailing end has no values, it was possible for the query cursor end early. This was caused by the KeyCursor.ReadFloatBlock returning no values instead of checking the remaing blocks. --- tsdb/engine/tsm1/file_store.gen.go | 45 ++++++++++++++++--------- tsdb/engine/tsm1/file_store.gen.go.tmpl | 9 +++-- tsdb/engine/tsm1/file_store.go | 30 ----------------- tsdb/engine/tsm1/file_store_test.go | 43 +++++++++++++++++++++++ 4 files changed, 79 insertions(+), 48 deletions(-) diff --git a/tsdb/engine/tsm1/file_store.gen.go b/tsdb/engine/tsm1/file_store.gen.go index bc8f3529bb..2fe4aa991f 100644 --- a/tsdb/engine/tsm1/file_store.gen.go +++ b/tsdb/engine/tsm1/file_store.gen.go @@ -8,6 +8,7 @@ package tsm1 // ReadFloatBlock reads the next block as a set of float values. func (c *KeyCursor) ReadFloatBlock(buf *[]FloatValue) ([]FloatValue, error) { +LOOP: // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -32,9 +33,11 @@ func (c *KeyCursor) ReadFloatBlock(buf *[]FloatValue) ([]FloatValue, error) { tombstones := first.r.TombstoneRange(c.key) values = c.filterFloatValues(tombstones, values) - // Check we have remaining values. - if len(values) == 0 { - return nil, nil + // If there are no values in this first block (all tombonstoned or previously read) and + // we have more potential blocks too search. Try again. + if len(values) == 0 && len(c.current) > 0 { + c.current = c.current[1:] + goto LOOP } // Only one block with this key and time range so return it @@ -185,6 +188,7 @@ func (c *KeyCursor) ReadFloatBlock(buf *[]FloatValue) ([]FloatValue, error) { // ReadIntegerBlock reads the next block as a set of integer values. func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error) { +LOOP: // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -209,9 +213,11 @@ func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error tombstones := first.r.TombstoneRange(c.key) values = c.filterIntegerValues(tombstones, values) - // Check we have remaining values. - if len(values) == 0 { - return nil, nil + // If there are no values in this first block (all tombonstoned or previously read) and + // we have more potential blocks too search. Try again. + if len(values) == 0 && len(c.current) > 0 { + c.current = c.current[1:] + goto LOOP } // Only one block with this key and time range so return it @@ -362,6 +368,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error // ReadUnsignedBlock reads the next block as a set of unsigned values. func (c *KeyCursor) ReadUnsignedBlock(buf *[]UnsignedValue) ([]UnsignedValue, error) { +LOOP: // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -386,9 +393,11 @@ func (c *KeyCursor) ReadUnsignedBlock(buf *[]UnsignedValue) ([]UnsignedValue, er tombstones := first.r.TombstoneRange(c.key) values = c.filterUnsignedValues(tombstones, values) - // Check we have remaining values. - if len(values) == 0 { - return nil, nil + // If there are no values in this first block (all tombonstoned or previously read) and + // we have more potential blocks too search. Try again. + if len(values) == 0 && len(c.current) > 0 { + c.current = c.current[1:] + goto LOOP } // Only one block with this key and time range so return it @@ -539,6 +548,7 @@ func (c *KeyCursor) ReadUnsignedBlock(buf *[]UnsignedValue) ([]UnsignedValue, er // ReadStringBlock reads the next block as a set of string values. func (c *KeyCursor) ReadStringBlock(buf *[]StringValue) ([]StringValue, error) { +LOOP: // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -563,9 +573,11 @@ func (c *KeyCursor) ReadStringBlock(buf *[]StringValue) ([]StringValue, error) { tombstones := first.r.TombstoneRange(c.key) values = c.filterStringValues(tombstones, values) - // Check we have remaining values. - if len(values) == 0 { - return nil, nil + // If there are no values in this first block (all tombonstoned or previously read) and + // we have more potential blocks too search. Try again. + if len(values) == 0 && len(c.current) > 0 { + c.current = c.current[1:] + goto LOOP } // Only one block with this key and time range so return it @@ -716,6 +728,7 @@ func (c *KeyCursor) ReadStringBlock(buf *[]StringValue) ([]StringValue, error) { // ReadBooleanBlock reads the next block as a set of boolean values. func (c *KeyCursor) ReadBooleanBlock(buf *[]BooleanValue) ([]BooleanValue, error) { +LOOP: // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -740,9 +753,11 @@ func (c *KeyCursor) ReadBooleanBlock(buf *[]BooleanValue) ([]BooleanValue, error tombstones := first.r.TombstoneRange(c.key) values = c.filterBooleanValues(tombstones, values) - // Check we have remaining values. - if len(values) == 0 { - return nil, nil + // If there are no values in this first block (all tombonstoned or previously read) and + // we have more potential blocks too search. Try again. + if len(values) == 0 && len(c.current) > 0 { + c.current = c.current[1:] + goto LOOP } // Only one block with this key and time range so return it diff --git a/tsdb/engine/tsm1/file_store.gen.go.tmpl b/tsdb/engine/tsm1/file_store.gen.go.tmpl index 734a325a80..727a641a5b 100644 --- a/tsdb/engine/tsm1/file_store.gen.go.tmpl +++ b/tsdb/engine/tsm1/file_store.gen.go.tmpl @@ -4,6 +4,7 @@ package tsm1 {{range .}} // Read{{.Name}}Block reads the next block as a set of {{.name}} values. func (c *KeyCursor) Read{{.Name}}Block(buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) { +LOOP: // No matching blocks to decode if len(c.current) == 0 { return nil, nil @@ -28,9 +29,11 @@ func (c *KeyCursor) Read{{.Name}}Block(buf *[]{{.Name}}Value) ([]{{.Name}}Value, tombstones := first.r.TombstoneRange(c.key) values = c.filter{{.Name}}Values(tombstones, values) - // Check we have remaining values. - if len(values) == 0 { - return nil, nil + // If there are no values in this first block (all tombonstoned or previously read) and + // we have more potential blocks too search. Try again. + if len(values) == 0 && len(c.current) > 0 { + c.current = c.current[1:] + goto LOOP } // Only one block with this key and time range so return it diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index e873cae6a7..4b8e658274 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -1062,12 +1062,6 @@ type KeyCursor struct { // decrement through the size of seeks slice. pos int ascending bool - - // duplicates is a hint that there are overlapping blocks for this key in - // multiple files (e.g. points have been overwritten but not fully compacted) - // If this is true, we need to scan the duplicate blocks and dedup the points - // as query time until they are compacted. - duplicates bool } type location struct { @@ -1126,8 +1120,6 @@ func newKeyCursor(ctx context.Context, fs *FileStore, key []byte, t int64, ascen ascending: ascending, } - c.duplicates = c.hasOverlappingBlocks() - if ascending { sort.Sort(ascLocations(c.seeks)) } else { @@ -1195,12 +1187,6 @@ func (c *KeyCursor) seekAscending(t int64) { } c.current = append(c.current, e) - - // Exit if we don't have duplicates. - // Otherwise, keep looking for additional blocks containing this point. - if !c.duplicates { - return - } } } } @@ -1214,12 +1200,6 @@ func (c *KeyCursor) seekDescending(t int64) { c.pos = i } c.current = append(c.current, e) - - // Exit if we don't have duplicates. - // Otherwise, keep looking for additional blocks containing this point. - if !c.duplicates { - return - } } } } @@ -1260,11 +1240,6 @@ func (c *KeyCursor) nextAscending() { } c.current[0] = c.seeks[c.pos] - // We're done if there are no overlapping blocks. - if !c.duplicates { - return - } - // If we have ovelapping blocks, append all their values so we can dedup for i := c.pos + 1; i < len(c.seeks); i++ { if c.seeks[i].read() { @@ -1293,11 +1268,6 @@ func (c *KeyCursor) nextDescending() { } c.current[0] = c.seeks[c.pos] - // We're done if there are no overlapping blocks. - if !c.duplicates { - return - } - // If we have ovelapping blocks, append all their values so we can dedup for i := c.pos; i >= 0; i-- { if c.seeks[i].read() { diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index d0517cf56b..15923170ec 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -2082,6 +2082,49 @@ func TestKeyCursor_TombstoneRange(t *testing.T) { } } +func TestKeyCursor_TombstoneRange_PartialFirst(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := tsm1.NewFileStore(dir) + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 0.0), tsm1.NewValue(1, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 2.0)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + // Delete part of the block in the first file. + r := MustOpenTSMReader(files[0]) + r.DeleteRange([][]byte{[]byte("cpu")}, 1, 3) + + fs.Replace(nil, files) + + buf := make([]tsm1.FloatValue, 1000) + c := fs.KeyCursor(context.Background(), []byte("cpu"), 0, true) + expValues := []tsm1.Value{tsm1.NewValue(0, 0.0), tsm1.NewValue(2, 2.0)} + + for _, exp := range expValues { + values, err := c.ReadFloatBlock(&buf) + if err != nil { + t.Fatalf("unexpected error reading values: %v", err) + } + + if got, exp := len(values), 1; got != exp { + t.Fatalf("value length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := values[0].String(), exp.String(); got != exp { + t.Fatalf("read value mismatch(%d): got %v, exp %v", 0, got, exp) + } + c.Next() + } +} + func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir)