diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 60adcfbe35..3bbaaf9233 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -109,6 +109,9 @@ type TSMFile interface { // DeleteRange removes the values for keys between timestamps min and max. DeleteRange(keys [][]byte, min, max int64) error + // DeletePrefix removes the values for keys beginning with prefix. + DeletePrefix(prefix []byte, min, max int64) error + // HasTombstones returns true if file contains values that have been deleted. HasTombstones() bool diff --git a/tsdb/tsm1/reader.go b/tsdb/tsm1/reader.go index fa8b7de095..195263d4ec 100644 --- a/tsdb/tsm1/reader.go +++ b/tsdb/tsm1/reader.go @@ -109,6 +109,12 @@ func (t *TSMReader) applyTombstones() error { batch := make([][]byte, 0, 4096) if err := t.tombstoner.Walk(func(ts Tombstone) error { + // TODO(jeff): maybe we need to do batches of prefixes + if ts.Prefix { + t.index.DeletePrefix(ts.Key, ts.Min, ts.Max) + return nil + } + cur = ts if len(batch) > 0 { if prev.Min != cur.Min || prev.Max != cur.Max { @@ -310,6 +316,20 @@ func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { return batch.Commit() } +// DeletePrefix removes the given points for keys beginning with prefix. +func (t *TSMReader) DeletePrefix(prefix []byte, minTime, maxTime int64) error { + if err := t.tombstoner.AddPrefixRange(prefix, minTime, maxTime); err != nil { + return err + } + + if err := t.tombstoner.Flush(); err != nil { + return err + } + + t.index.DeletePrefix(prefix, minTime, maxTime) + return nil +} + // Delete deletes blocks indicated by keys. func (t *TSMReader) Delete(keys [][]byte) error { if err := t.tombstoner.Add(keys); err != nil { diff --git a/tsdb/tsm1/reader_index.go b/tsdb/tsm1/reader_index.go index 0a88fc0017..9febb7a19a 100644 --- a/tsdb/tsm1/reader_index.go +++ b/tsdb/tsm1/reader_index.go @@ -453,9 +453,15 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) { // next until after we've checked the key, so keep a "first" flag. first := true iter := d.ro.Iterator() - iter.Seek(prefix, &d.b) for { - if (!first && !iter.Next()) || !bytes.HasPrefix(iter.Key(&d.b), prefix) { + if first { + if _, ok := iter.Seek(prefix, &d.b); !ok { + break + } + } else if !iter.Next() { + break + } + if !bytes.HasPrefix(iter.Key(&d.b), prefix) { break } first = false diff --git a/tsdb/tsm1/reader_index_test.go b/tsdb/tsm1/reader_index_test.go index 918d80bb93..6f88f0a712 100644 --- a/tsdb/tsm1/reader_index_test.go +++ b/tsdb/tsm1/reader_index_test.go @@ -218,6 +218,25 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) { check(t, ind.ContainsValue([]byte("cpu2"), 16), false) } +func TestIndirectIndex_DeletePrefix_NoMatch(t *testing.T) { + check := func(t *testing.T, got, exp bool) { + t.Helper() + if exp != got { + t.Fatalf("expected: %v but got: %v", exp, got) + } + } + + index := NewIndexWriter() + index.Add([]byte("cpu"), BlockInteger, 0, 10, 10, 20) + ind := loadIndex(t, index) + + ind.DeletePrefix([]byte("b"), 5, 5) + ind.DeletePrefix([]byte("d"), 5, 5) + + check(t, ind.Contains([]byte("cpu")), true) + check(t, ind.ContainsValue([]byte("cpu"), 5), true) +} + // // indirectIndex benchmarks // diff --git a/tsdb/tsm1/reader_test.go b/tsdb/tsm1/reader_test.go index 1c1f4fe725..5b0122f210 100644 --- a/tsdb/tsm1/reader_test.go +++ b/tsdb/tsm1/reader_test.go @@ -1182,7 +1182,6 @@ func TestTSMReader_UnmarshalBinary_BlockCountOverflow(t *testing.T) { defer r.Close() } - func TestCompacted_NotFull(t *testing.T) { dir := mustTempDir() defer os.RemoveAll(dir) @@ -1551,3 +1550,71 @@ func TestTSMReader_References(t *testing.T) { t.Fatalf("unexpected error removing reader: %v", err) } } + +func TestTSMReader_DeletePrefix(t *testing.T) { + dir := mustTempDir() + defer os.RemoveAll(dir) + f := mustTempFile(dir) + + // create data in a tsm file + w, err := NewTSMWriter(f) + fatalIfErr(t, "creating writer", err) + + err = w.Write([]byte("cpu"), []Value{ + NewValue(0, int64(1)), + NewValue(5, int64(2)), + NewValue(10, int64(3)), + NewValue(15, int64(4)), + }) + fatalIfErr(t, "writing", err) + + err = w.WriteIndex() + fatalIfErr(t, "writing index", err) + + err = w.Close() + fatalIfErr(t, "closing", err) + + // open the tsm file and delete the prefix + f, err = os.Open(f.Name()) + fatalIfErr(t, "opening", err) + + r, err := NewTSMReader(f) + fatalIfErr(t, "creating reader", err) + + err = r.DeletePrefix([]byte("c"), 0, 5) + fatalIfErr(t, "deleting prefix", err) + + values, err := r.ReadAll([]byte("cpu")) + fatalIfErr(t, "reading values", err) + if got, exp := len(values), 2; got != exp { + t.Fatalf("wrong number of values: %d but wanted: %d", got, exp) + } + if got, exp := values[0], NewValue(10, int64(3)); got != exp { + t.Fatalf("wrong value: %q but wanted %q", got, exp) + } + if got, exp := values[1], NewValue(15, int64(4)); got != exp { + t.Fatalf("wrong value: %q but wanted %q", got, exp) + } + + err = r.Close() + fatalIfErr(t, "closing reader", err) + + // open the tsm file and check that the deletes still happened + f, err = os.Open(f.Name()) + fatalIfErr(t, "opening", err) + + r, err = NewTSMReader(f) + fatalIfErr(t, "creating reader", err) + + values, err = r.ReadAll([]byte("cpu")) + fatalIfErr(t, "reading values", err) + if got, exp := len(values), 2; got != exp { + t.Fatalf("wrong number of values: %d but wanted: %d", got, exp) + } + if got, exp := values[0], NewValue(10, int64(3)); got != exp { + t.Fatalf("wrong value: %q but wanted %q", got, exp) + } + if got, exp := values[1], NewValue(15, int64(4)); got != exp { + t.Fatalf("wrong value: %q but wanted %q", got, exp) + } +} diff --git a/tsdb/tsm1/tombstone.go b/tsdb/tsm1/tombstone.go index 7711c65edd..f57f06f42e 100644 --- a/tsdb/tsm1/tombstone.go +++ b/tsdb/tsm1/tombstone.go @@ -143,10 +143,6 @@ func (t *Tombstoner) AddPrefix(key []byte) error { // AddPrefixRange adds a prefix-based tombstone key with an explicit range. func (t *Tombstoner) AddPrefixRange(key []byte, min, max int64) error { - if t.FilterFn != nil && !t.FilterFn(key) { - return nil - } - t.mu.Lock() defer t.mu.Unlock()