Fix compactions aborting early

If there were many individual deletes to a series that ended up
deleting every value in the block and the tombstone timestamps
were not contigous, it was possible for the TSMKeyIterator to
return false for Next incorrectly.  This causes the compaction to
drop any remaining data in the file.

Normally, if all the data is deleted via tombstones, we remove the
whole key from the TSM index.  In this case, we're not able to determine
that the key is fully deleted until the block is decode and tombstones
are applied.

This changes the TSMKeyIterator to detect this condition and continue
to the next key instead of aborting.
pull/9185/head
Jason Wilder 2017-11-30 14:38:09 -07:00
parent 48217793ac
commit b6096414c2
2 changed files with 90 additions and 0 deletions

View File

@ -1280,6 +1280,7 @@ func (k *tsmKeyIterator) hasMergedValues() bool {
// Next returns true if there are any values remaining in the iterator.
func (k *tsmKeyIterator) Next() bool {
RETRY:
// Any merged blocks pending?
if len(k.merged) > 0 {
k.merged = k.merged[1:]
@ -1414,6 +1415,12 @@ func (k *tsmKeyIterator) Next() bool {
k.merge()
// After merging all the values for this key, we might not have any. (e.g. they were all deleted
// through many tombstones). In this case, move on to the next key instead of ending iteration.
if len(k.merged) == 0 {
goto RETRY
}
return len(k.merged) > 0
}

View File

@ -1022,6 +1022,89 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
}
}
// Tests that deleted keys are not seen during iteration with
// TSM files.
func TestTSMKeyIterator_SingleDeletes(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(10, int64(1))
v2 := tsm1.NewValue(20, int64(1))
v3 := tsm1.NewValue(30, int64(1))
v4 := tsm1.NewValue(40, int64(1))
v5 := tsm1.NewValue(50, int64(1))
v6 := tsm1.NewValue(60, int64(1))
points1 := map[string][]tsm1.Value{
"cpu,host=0#!~#value": []tsm1.Value{v1, v2},
"cpu,host=A#!~#value": []tsm1.Value{v5, v6},
"cpu,host=B#!~#value": []tsm1.Value{v3, v4},
"cpu,host=C#!~#value": []tsm1.Value{v1, v2},
"cpu,host=D#!~#value": []tsm1.Value{v1, v2},
}
r1 := MustTSMReader(dir, 1, points1)
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=A#!~#value")}, 50, 50); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=A#!~#value")}, 60, 60); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 10, 10); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 60, 60); nil != e {
t.Fatal(e)
}
if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 20, 20); nil != e {
t.Fatal(e)
}
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues int
var data = []struct {
key string
value tsm1.Value
}{
{"cpu,host=0#!~#value", v1},
{"cpu,host=B#!~#value", v3},
{"cpu,host=D#!~#value", v1},
}
for iter.Next() {
key, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
values, err := tsm1.DecodeBlock(block, nil)
if err != nil {
t.Fatalf("unexpected error decode: %v", err)
}
if exp, got := string(key), data[0].key; exp != got {
t.Fatalf("key mismatch: got %v, exp %v", exp, got)
}
if exp, got := len(values), 2; exp != got {
t.Fatalf("values length mismatch: exp %v, got %v", exp, got)
}
readValues++
assertValueEqual(t, values[0], data[0].value)
data = data[1:]
}
if exp, got := 3, readValues; exp != got {
t.Fatalf("failed to read expected values: exp %v, got %v", exp, got)
}
}
// Tests that the TSMKeyIterator will abort if the interrupt channel is closed
func TestTSMKeyIterator_Abort(t *testing.T) {
dir := MustTempDir()