diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 0fa48fd870..acc888bc2c 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1280,6 +1280,7 @@ func (k *tsmKeyIterator) hasMergedValues() bool { // Next returns true if there are any values remaining in the iterator. func (k *tsmKeyIterator) Next() bool { +RETRY: // Any merged blocks pending? if len(k.merged) > 0 { k.merged = k.merged[1:] @@ -1414,6 +1415,12 @@ func (k *tsmKeyIterator) Next() bool { k.merge() + // After merging all the values for this key, we might not have any. (e.g. they were all deleted + // through many tombstones). In this case, move on to the next key instead of ending iteration. + if len(k.merged) == 0 { + goto RETRY + } + return len(k.merged) > 0 } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index d6c591d5bf..e3608f9837 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1022,6 +1022,89 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { } } +// Tests that deleted keys are not seen during iteration with +// TSM files. +func TestTSMKeyIterator_SingleDeletes(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(10, int64(1)) + v2 := tsm1.NewValue(20, int64(1)) + v3 := tsm1.NewValue(30, int64(1)) + v4 := tsm1.NewValue(40, int64(1)) + v5 := tsm1.NewValue(50, int64(1)) + v6 := tsm1.NewValue(60, int64(1)) + + points1 := map[string][]tsm1.Value{ + "cpu,host=0#!~#value": []tsm1.Value{v1, v2}, + "cpu,host=A#!~#value": []tsm1.Value{v5, v6}, + "cpu,host=B#!~#value": []tsm1.Value{v3, v4}, + "cpu,host=C#!~#value": []tsm1.Value{v1, v2}, + "cpu,host=D#!~#value": []tsm1.Value{v1, v2}, + } + + r1 := MustTSMReader(dir, 1, points1) + + if e := r1.DeleteRange([][]byte{[]byte("cpu,host=A#!~#value")}, 50, 50); nil != e { + t.Fatal(e) + } + if e := r1.DeleteRange([][]byte{[]byte("cpu,host=A#!~#value")}, 60, 60); nil != e { + t.Fatal(e) + } + if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 10, 10); nil != e { + t.Fatal(e) + } + if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 60, 60); nil != e { + t.Fatal(e) + } + if e := r1.DeleteRange([][]byte{[]byte("cpu,host=C#!~#value")}, 20, 20); nil != e { + t.Fatal(e) + } + + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues int + var data = []struct { + key string + value tsm1.Value + }{ + {"cpu,host=0#!~#value", v1}, + {"cpu,host=B#!~#value", v3}, + {"cpu,host=D#!~#value", v1}, + } + + for iter.Next() { + key, _, _, block, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + values, err := tsm1.DecodeBlock(block, nil) + if err != nil { + t.Fatalf("unexpected error decode: %v", err) + } + + if exp, got := string(key), data[0].key; exp != got { + t.Fatalf("key mismatch: got %v, exp %v", exp, got) + } + + if exp, got := len(values), 2; exp != got { + t.Fatalf("values length mismatch: exp %v, got %v", exp, got) + } + readValues++ + + assertValueEqual(t, values[0], data[0].value) + data = data[1:] + } + + if exp, got := 3, readValues; exp != got { + t.Fatalf("failed to read expected values: exp %v, got %v", exp, got) + } +} + // Tests that the TSMKeyIterator will abort if the interrupt channel is closed func TestTSMKeyIterator_Abort(t *testing.T) { dir := MustTempDir()