diff --git a/tsdb/tsm1/engine_delete_prefix.go b/tsdb/tsm1/engine_delete_prefix.go index b01085464e..3f4c77d859 100644 --- a/tsdb/tsm1/engine_delete_prefix.go +++ b/tsdb/tsm1/engine_delete_prefix.go @@ -11,13 +11,6 @@ import ( "github.com/influxdata/influxql" ) -// Predicate is something that can match on a series key. It also exports some other -// methods that can be used in order to more efficiently walk indexes. -type Predicate interface { - Matches(key []byte) bool - Measurement() []byte // if non-nil, specifies a specific measurement to match -} - // DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index // and series file data associated with the bucket. The provided time range ensures // that only bucket data for that range is removed. @@ -71,7 +64,7 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) possiblyDead.keys = make(map[string]struct{}) if err := e.FileStore.Apply(func(r TSMFile) error { - return r.DeletePrefix(name, min, max, func(key []byte) { + return r.DeletePrefix(name, min, max, pred, func(key []byte) { possiblyDead.Lock() possiblyDead.keys[string(key)] = struct{}{} possiblyDead.Unlock() diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 454c0be161..07f1c68376 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -117,7 +117,7 @@ type TSMFile interface { // DeletePrefix removes the values for keys beginning with prefix. It calls dead with // any keys that became dead as a result of this call. - DeletePrefix(prefix []byte, min, max int64, dead func([]byte)) error + DeletePrefix(prefix []byte, min, max int64, pred Predicate, dead func([]byte)) error // HasTombstones returns true if file contains values that have been deleted. HasTombstones() bool diff --git a/tsdb/tsm1/predicate.go b/tsdb/tsm1/predicate.go new file mode 100644 index 0000000000..d80b619707 --- /dev/null +++ b/tsdb/tsm1/predicate.go @@ -0,0 +1,7 @@ +package tsm1 + +// Predicate is something that can match on a series key. +type Predicate interface { + Matches(key []byte) bool + Marshal() ([]byte, error) +} diff --git a/tsdb/tsm1/reader.go b/tsdb/tsm1/reader.go index 42a588dec4..a6e57ba8b6 100644 --- a/tsdb/tsm1/reader.go +++ b/tsdb/tsm1/reader.go @@ -108,7 +108,8 @@ func (t *TSMReader) applyTombstones() error { if err := t.tombstoner.Walk(func(ts Tombstone) error { // TODO(jeff): maybe we need to do batches of prefixes if ts.Prefix { - t.index.DeletePrefix(ts.Key, ts.Min, ts.Max, nil) + // TODO(jeff): pass the predicate + t.index.DeletePrefix(ts.Key, ts.Min, ts.Max, nil, nil) return nil } @@ -298,6 +299,20 @@ func (t *TSMReader) MaybeContainsValue(key []byte, ts int64) bool { return t.index.MaybeContainsValue(key, ts) } +// Delete deletes blocks indicated by keys. +func (t *TSMReader) Delete(keys [][]byte) error { + if !t.index.Delete(keys) { + return nil + } + if err := t.tombstoner.Add(keys); err != nil { + return err + } + if err := t.tombstoner.Flush(); err != nil { + return err + } + return nil +} + // DeleteRange removes the given points for keys between minTime and maxTime. The series // keys passed in must be sorted. func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { @@ -315,25 +330,15 @@ func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { // DeletePrefix removes the given points for keys beginning with prefix. It calls dead with // any keys that became dead as a result of this call. -func (t *TSMReader) DeletePrefix(prefix []byte, minTime, maxTime int64, dead func([]byte)) error { - if !t.index.DeletePrefix(prefix, minTime, maxTime, dead) { - return nil - } - if err := t.tombstoner.AddPrefixRange(prefix, minTime, maxTime); err != nil { - return err - } - if err := t.tombstoner.Flush(); err != nil { - return err - } - return nil -} +func (t *TSMReader) DeletePrefix(prefix []byte, minTime, maxTime int64, + pred Predicate, dead func([]byte)) error { -// Delete deletes blocks indicated by keys. -func (t *TSMReader) Delete(keys [][]byte) error { - if !t.index.Delete(keys) { + if !t.index.DeletePrefix(prefix, minTime, maxTime, pred, dead) { return nil } - if err := t.tombstoner.Add(keys); err != nil { + + // TODO(jeff): pass predicate into the tombstoner + if err := t.tombstoner.AddPrefixRange(prefix, minTime, maxTime); err != nil { return err } if err := t.tombstoner.Flush(); err != nil { diff --git a/tsdb/tsm1/reader_index.go b/tsdb/tsm1/reader_index.go index bfc6e73221..2598997899 100644 --- a/tsdb/tsm1/reader_index.go +++ b/tsdb/tsm1/reader_index.go @@ -25,7 +25,7 @@ type TSMIndex interface { // DeletePrefix removes keys that begin with the given prefix with data between minTime and // maxTime from the index. Returns true if there were any changes. It calls dead with any // keys that became dead as a result of this call. - DeletePrefix(prefix []byte, minTime, maxTime int64, dead func([]byte)) bool + DeletePrefix(prefix []byte, minTime, maxTime int64, pred Predicate, dead func([]byte)) bool // MaybeContainsKey returns true if the given key may exist in the index. This is faster than // Contains but, may return false positives. @@ -299,10 +299,10 @@ func (d *indirectIndex) coversEntries(offset uint32, key []byte, buf []TimeRange // create the merger with the other tombstone entries: the ones for the specific // key and the one we have proposed to add. merger := timeRangeMerger{ - sorted: d.tombstones[offset], - unsorted: buf, - single: TimeRange{Min: minTime, Max: maxTime}, - used: false, + fromMap: d.tombstones[offset], + fromPrefix: buf, + single: TimeRange{Min: minTime, Max: maxTime}, + used: false, } return buf, timeRangesCoverEntries(merger, entries) @@ -381,7 +381,7 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool Index: iter.Index(), Offset: offset, EntryOffset: entryOffset, - Tombstones: len(d.tombstones[offset]), + Tombstones: len(d.tombstones[offset]) + d.prefixTombstones.Count(key), }) } @@ -395,10 +395,12 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool defer d.mu.Unlock() for _, p := range pending { - // Check the existing tombstones. If the length did not/ change, then we know + key := keys[p.Key] + + // Check the existing tombstones. If the length did not change, then we know // that we don't need to double check coverage, since we only ever increase the // number of tombstones for a key. - if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs) { + if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs)+d.prefixTombstones.Count(key) { d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime) continue } @@ -421,7 +423,7 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool continue } - trbuf, ok = d.coversEntries(p.Offset, keys[p.Key], trbuf, entries, minTime, maxTime) + trbuf, ok = d.coversEntries(p.Offset, key, trbuf, entries, minTime, maxTime) if ok { delete(d.tombstones, p.Offset) iter.SetIndex(p.Index) @@ -443,7 +445,9 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool // DeletePrefix removes keys that begin with the given prefix with data between minTime and // maxTime from the index. Returns true if there were any changes. It calls dead with any // keys that became dead as a result of this call. -func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead func([]byte)) bool { +func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, + pred Predicate, dead func([]byte)) bool { + if dead == nil { dead = func([]byte) {} } @@ -461,6 +465,8 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead ok bool trbuf []TimeRange entries []IndexEntry + pending []pendingTombstone + keys [][]byte err error mustTrack bool ) @@ -484,6 +490,11 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead break } + // If we have a predicate, skip the key if it doesn't match. + if pred != nil && !pred.Matches(key) { + continue + } + // if we're not doing a partial delete, we don't need to read the entries and // can just delete the key and move on. if !partial { @@ -517,7 +528,8 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead } // Does adding the minTime and maxTime cover the entries? - trbuf, ok = d.coversEntries(iter.Offset(), iter.Key(&d.b), trbuf, entries, minTime, maxTime) + offset := iter.Offset() + trbuf, ok = d.coversEntries(offset, iter.Key(&d.b), trbuf, entries, minTime, maxTime) if ok { dead(key) iter.Delete() @@ -526,25 +538,94 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead // Otherwise, we have to track it in the prefix tombstones list. mustTrack = true + + // If we have a predicate, we must keep track of a pending tombstone entry for the key. + if pred != nil { + pending = append(pending, pendingTombstone{ + Key: len(keys), + Index: iter.Index(), + Offset: offset, + EntryOffset: entryOffset, + Tombstones: len(d.tombstones[offset]) + d.prefixTombstones.Count(key), + }) + keys = append(keys, key) + } } d.mu.RUnlock() // Check and abort if nothing needs to be done. - if !mustTrack && !iter.HasDeletes() { + if !mustTrack && len(pending) == 0 && !iter.HasDeletes() { return false } d.mu.Lock() defer d.mu.Unlock() - if mustTrack { - d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime}) + if pred == nil { + // If we don't have a predicate, we can add a single prefix tombstone entry. + if mustTrack { + d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime}) + } + + // Clean up any fully deleted keys. + if iter.HasDeletes() { + iter.Done() + } + return true } + // Otherwise, we must walk the pending deletes individually. + for _, p := range pending { + key := keys[p.Key] + + // Check the existing tombstones. If the length did not change, then we know + // that we don't need to double check coverage, since we only ever increase the + // number of tombstones for a key. + if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs)+d.prefixTombstones.Count(key) { + d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime) + continue + } + + // Since the length changed, we have to do the expensive overlap check again. + // We re-read the entries again under the write lock because this should be + // rare and only during concurrent deletes to the same key. We could make + // a copy of the entries before getting here, but that penalizes the common + // no-concurrent case. + entries, err = readEntriesTimes(d.b.access(p.EntryOffset, 0), entries) + if err != nil { + // If we have an error reading the entries for a key, we should just pretend + // the whole key is deleted. Maybe a better idea is to report this up somehow + // but that's for another time. + delete(d.tombstones, p.Offset) + iter.SetIndex(p.Index) + if iter.Offset() == p.Offset { + dead(key) + iter.Delete() + } + continue + } + + // If it does cover, remove the key entirely. + trbuf, ok = d.coversEntries(p.Offset, key, trbuf, entries, minTime, maxTime) + if ok { + delete(d.tombstones, p.Offset) + iter.SetIndex(p.Index) + if iter.Offset() == p.Offset { + dead(key) + iter.Delete() + } + continue + } + + // Append the TimeRange into the tombstones. + trs := d.tombstones[p.Offset] + d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime) + } + + // Clean up any fully deleted keys. if iter.HasDeletes() { iter.Done() } - return true } diff --git a/tsdb/tsm1/reader_index_test.go b/tsdb/tsm1/reader_index_test.go index 1bb2fa8fc1..8d9d4eeab3 100644 --- a/tsdb/tsm1/reader_index_test.go +++ b/tsdb/tsm1/reader_index_test.go @@ -1,9 +1,12 @@ package tsm1 import ( + "bytes" "fmt" "math" + "math/rand" "reflect" + "sync" "sync/atomic" "testing" ) @@ -154,6 +157,8 @@ func TestIndirectIndex_DeleteRange(t *testing.T) { check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), false) } +// TODO(jeff): predicate tests + func TestIndirectIndex_DeletePrefix(t *testing.T) { check := func(t *testing.T, got, exp bool) { t.Helper() @@ -170,7 +175,7 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) { index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20) ind := loadIndex(t, index) - ind.DeletePrefix([]byte("c"), 5, 15, nil) + ind.DeletePrefix([]byte("c"), 5, 15, nil, nil) check(t, ind.Contains([]byte("mem")), true) check(t, ind.Contains([]byte("cpu1")), true) @@ -186,7 +191,7 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) { check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false) check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true) - ind.DeletePrefix([]byte("cp"), 0, 5, nil) + ind.DeletePrefix([]byte("cp"), 0, 5, nil, nil) check(t, ind.Contains([]byte("mem")), true) check(t, ind.Contains([]byte("cpu1")), true) @@ -202,7 +207,7 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) { check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false) check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true) - ind.DeletePrefix([]byte("cpu"), 15, 20, nil) + ind.DeletePrefix([]byte("cpu"), 15, 20, nil, nil) check(t, ind.Contains([]byte("mem")), true) check(t, ind.Contains([]byte("cpu1")), false) @@ -231,8 +236,8 @@ func TestIndirectIndex_DeletePrefix_NoMatch(t *testing.T) { index.Add([]byte("cpu"), BlockInteger, 0, 10, 10, 20) ind := loadIndex(t, index) - ind.DeletePrefix([]byte("b"), 5, 5, nil) - ind.DeletePrefix([]byte("d"), 5, 5, nil) + ind.DeletePrefix([]byte("b"), 5, 5, nil, nil) + ind.DeletePrefix([]byte("d"), 5, 5, nil, nil) check(t, ind.Contains([]byte("cpu")), true) check(t, ind.MaybeContainsValue([]byte("cpu"), 5), true) @@ -261,19 +266,71 @@ func TestIndirectIndex_DeletePrefix_Dead(t *testing.T) { index.Add([]byte("dpu"), BlockInteger, 0, 10, 10, 20) ind := loadIndex(t, index) - ind.DeletePrefix([]byte("b"), 5, 5, dead) + ind.DeletePrefix([]byte("b"), 5, 5, nil, dead) check(t, keys, b()) - ind.DeletePrefix([]byte("c"), 0, 9, dead) + ind.DeletePrefix([]byte("c"), 0, 9, nil, dead) check(t, keys, b()) - ind.DeletePrefix([]byte("c"), 9, 10, dead) + ind.DeletePrefix([]byte("c"), 9, 10, nil, dead) check(t, keys, b("cpu")) - ind.DeletePrefix([]byte("d"), -50, 50, dead) + ind.DeletePrefix([]byte("d"), -50, 50, nil, dead) check(t, keys, b("cpu", "dpu")) } +func TestIndirectIndex_DeletePrefix_Dead_Fuzz(t *testing.T) { + key := bytes.Repeat([]byte("X"), 32) + check := func(t *testing.T, got, exp interface{}) { + t.Helper() + if !reflect.DeepEqual(exp, got) { + t.Fatalf("expected: %v but got: %v", exp, got) + } + } + + for i := 0; i < 5000; i++ { + // Create an index with the key in it + writer := NewIndexWriter() + writer.Add(key, BlockInteger, 0, 10, 10, 20) + ind := loadIndex(t, writer) + + // Keep track if dead is ever called. + happened := uint64(0) + dead := func([]byte) { atomic.AddUint64(&happened, 1) } + + // Build up a random set of operations to delete the key. + ops := make([]func(), 9) + for j := range ops { + n := int64(j) + if rand.Intn(2) == 0 { + kn := key[:rand.Intn(len(key))] + ops[j] = func() { ind.DeletePrefix(kn, n, n+1, nil, dead) } + } else { + ops[j] = func() { ind.DeleteRange([][]byte{key}, n, n+1) } + } + } + + // Since we will run the ops concurrently, this shuffle is unnecessary + // but it might provide more coverage of random orderings than the + // scheduler randomness alone. + rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] }) + + // Run the operations concurrently. The key should never be dead. + var wg sync.WaitGroup + for _, op := range ops { + op := op + wg.Add(1) + go func() { op(); wg.Done() }() + } + wg.Wait() + check(t, happened, uint64(0)) + + // Run the last delete operation. It should kill the key. + ind.DeletePrefix(key, 9, 10, nil, dead) + check(t, happened, uint64(1)) + } +} + // // indirectIndex benchmarks // @@ -549,7 +606,7 @@ func BenchmarkIndirectIndex_DeletePrefixFull(b *testing.B) { indirect, _ = getIndex(b, name) b.StartTimer() - indirect.DeletePrefix(prefix, 10, 50, nil) + indirect.DeletePrefix(prefix, 10, 50, nil, nil) } if faultBufferEnabled { @@ -574,7 +631,7 @@ func BenchmarkIndirectIndex_DeletePrefixFull_Covered(b *testing.B) { indirect, _ = getIndex(b, name) b.StartTimer() - indirect.DeletePrefix(prefix, 0, math.MaxInt64, nil) + indirect.DeletePrefix(prefix, 0, math.MaxInt64, nil, nil) } if faultBufferEnabled { diff --git a/tsdb/tsm1/reader_prefix_tree.go b/tsdb/tsm1/reader_prefix_tree.go index c8d5836814..55dcd0d97e 100644 --- a/tsdb/tsm1/reader_prefix_tree.go +++ b/tsdb/tsm1/reader_prefix_tree.go @@ -69,6 +69,27 @@ func (p *prefixTree) Search(key []byte, buf []TimeRange) []TimeRange { return buf } +func (p *prefixTree) Count(key []byte) int { + count := len(p.values) + + if len(key) > 0 { + if ch, ok := p.short[key[0]]; ok { + count += ch.Count(key[1:]) + } + } + + if len(key) >= prefixTreeKeySize { + var lookup prefixTreeKey + copy(lookup[:], key) + + if ch, ok := p.long[lookup]; ok { + count += ch.Count(key[prefixTreeKeySize:]) + } + } + + return count +} + func (p *prefixTree) checkOverlap(key []byte, ts int64) bool { for _, t := range p.values { if t.Min <= ts && ts <= t.Max { diff --git a/tsdb/tsm1/reader_test.go b/tsdb/tsm1/reader_test.go index f538dc9d19..33e32557ad 100644 --- a/tsdb/tsm1/reader_test.go +++ b/tsdb/tsm1/reader_test.go @@ -1581,7 +1581,7 @@ func TestTSMReader_DeletePrefix(t *testing.T) { r, err := NewTSMReader(f) fatalIfErr(t, "creating reader", err) - err = r.DeletePrefix([]byte("c"), 0, 5, nil) + err = r.DeletePrefix([]byte("c"), 0, 5, nil, nil) fatalIfErr(t, "deleting prefix", err) values, err := r.ReadAll([]byte("cpu")) diff --git a/tsdb/tsm1/reader_time_range.go b/tsdb/tsm1/reader_time_range.go index 282cfad6cb..f6c95beada 100644 --- a/tsdb/tsm1/reader_time_range.go +++ b/tsdb/tsm1/reader_time_range.go @@ -60,10 +60,10 @@ func timeRangesCoverEntries(merger timeRangeMerger, entries []IndexEntry) (cover // timeRangeMerger is a special purpose data structure to merge three sources of // TimeRanges so that we can check if they cover a slice of index entries. type timeRangeMerger struct { - sorted []TimeRange - unsorted []TimeRange - single TimeRange - used bool // if single has been used + fromMap []TimeRange + fromPrefix []TimeRange + single TimeRange + used bool // if single has been used } // Pop returns the next TimeRange in sorted order and a boolean indicating that @@ -72,14 +72,14 @@ func (t *timeRangeMerger) Pop() (out TimeRange, ok bool) { var where *[]TimeRange var what []TimeRange - if len(t.sorted) > 0 { - where, what = &t.sorted, t.sorted[1:] - out, ok = t.sorted[0], true + if len(t.fromMap) > 0 { + where, what = &t.fromMap, t.fromMap[1:] + out, ok = t.fromMap[0], true } - if len(t.unsorted) > 0 && (!ok || t.unsorted[0].Less(out)) { - where, what = &t.unsorted, t.unsorted[1:] - out, ok = t.unsorted[0], true + if len(t.fromPrefix) > 0 && (!ok || t.fromPrefix[0].Less(out)) { + where, what = &t.fromPrefix, t.fromPrefix[1:] + out, ok = t.fromPrefix[0], true } if !t.used && (!ok || t.single.Less(out)) { diff --git a/tsdb/tsm1/reader_time_range_test.go b/tsdb/tsm1/reader_time_range_test.go index 54bbc459c1..e84563e5d0 100644 --- a/tsdb/tsm1/reader_time_range_test.go +++ b/tsdb/tsm1/reader_time_range_test.go @@ -34,19 +34,19 @@ func TestTimeRangeMerger(t *testing.T) { } check(t, ranges(0, 1, 2, 3, 4, 5, 6), timeRangeMerger{ - sorted: ranges(0, 2, 6), - unsorted: ranges(1, 3, 5), - single: TimeRange{4, 4}, + fromMap: ranges(0, 2, 6), + fromPrefix: ranges(1, 3, 5), + single: TimeRange{4, 4}, }) check(t, ranges(0, 1, 2), timeRangeMerger{ - sorted: ranges(0, 1, 2), - used: true, + fromMap: ranges(0, 1, 2), + used: true, }) check(t, ranges(0, 1, 2), timeRangeMerger{ - unsorted: ranges(0, 1, 2), - used: true, + fromPrefix: ranges(0, 1, 2), + used: true, }) check(t, ranges(0), timeRangeMerger{ @@ -54,9 +54,9 @@ func TestTimeRangeMerger(t *testing.T) { }) check(t, ranges(0, 0, 0), timeRangeMerger{ - sorted: ranges(0), - unsorted: ranges(0), - single: TimeRange{0, 0}, + fromMap: ranges(0), + fromPrefix: ranges(0), + single: TimeRange{0, 0}, }) } @@ -78,7 +78,7 @@ func TestTimeRangeCoverEntries(t *testing.T) { check := func(t *testing.T, ranges []TimeRange, entries []IndexEntry, covers bool) { t.Helper() sort.Slice(ranges, func(i, j int) bool { return ranges[i].Less(ranges[j]) }) - got := timeRangesCoverEntries(timeRangeMerger{sorted: ranges, used: true}, entries) + got := timeRangesCoverEntries(timeRangeMerger{fromMap: ranges, used: true}, entries) if got != covers { t.Fatalf("bad covers:\nranges: %v\nentries: %v\ncovers: %v\ngot: %v", ranges, entries, covers, got)