tsm1: basic predicate implementation at index layer

Only wires it up. No tests, no tombstone tracking, nothing.
pull/13371/head
Jeff Wendling 2019-04-12 13:36:17 -06:00 committed by Jeff Wendling
parent 7403fd8aa9
commit dcf797f111
10 changed files with 238 additions and 74 deletions

View File

@ -11,13 +11,6 @@ import (
"github.com/influxdata/influxql"
)
// Predicate is something that can match on a series key. It also exports some other
// methods that can be used in order to more efficiently walk indexes.
type Predicate interface {
Matches(key []byte) bool
Measurement() []byte // if non-nil, specifies a specific measurement to match
}
// DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index
// and series file data associated with the bucket. The provided time range ensures
// that only bucket data for that range is removed.
@ -71,7 +64,7 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate)
possiblyDead.keys = make(map[string]struct{})
if err := e.FileStore.Apply(func(r TSMFile) error {
return r.DeletePrefix(name, min, max, func(key []byte) {
return r.DeletePrefix(name, min, max, pred, func(key []byte) {
possiblyDead.Lock()
possiblyDead.keys[string(key)] = struct{}{}
possiblyDead.Unlock()

View File

@ -117,7 +117,7 @@ type TSMFile interface {
// DeletePrefix removes the values for keys beginning with prefix. It calls dead with
// any keys that became dead as a result of this call.
DeletePrefix(prefix []byte, min, max int64, dead func([]byte)) error
DeletePrefix(prefix []byte, min, max int64, pred Predicate, dead func([]byte)) error
// HasTombstones returns true if file contains values that have been deleted.
HasTombstones() bool

7
tsdb/tsm1/predicate.go Normal file
View File

@ -0,0 +1,7 @@
package tsm1
// Predicate is something that can match on a series key.
type Predicate interface {
Matches(key []byte) bool
Marshal() ([]byte, error)
}

View File

@ -108,7 +108,8 @@ func (t *TSMReader) applyTombstones() error {
if err := t.tombstoner.Walk(func(ts Tombstone) error {
// TODO(jeff): maybe we need to do batches of prefixes
if ts.Prefix {
t.index.DeletePrefix(ts.Key, ts.Min, ts.Max, nil)
// TODO(jeff): pass the predicate
t.index.DeletePrefix(ts.Key, ts.Min, ts.Max, nil, nil)
return nil
}
@ -298,6 +299,20 @@ func (t *TSMReader) MaybeContainsValue(key []byte, ts int64) bool {
return t.index.MaybeContainsValue(key, ts)
}
// Delete deletes blocks indicated by keys.
func (t *TSMReader) Delete(keys [][]byte) error {
if !t.index.Delete(keys) {
return nil
}
if err := t.tombstoner.Add(keys); err != nil {
return err
}
if err := t.tombstoner.Flush(); err != nil {
return err
}
return nil
}
// DeleteRange removes the given points for keys between minTime and maxTime. The series
// keys passed in must be sorted.
func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
@ -315,25 +330,15 @@ func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
// DeletePrefix removes the given points for keys beginning with prefix. It calls dead with
// any keys that became dead as a result of this call.
func (t *TSMReader) DeletePrefix(prefix []byte, minTime, maxTime int64, dead func([]byte)) error {
if !t.index.DeletePrefix(prefix, minTime, maxTime, dead) {
return nil
}
if err := t.tombstoner.AddPrefixRange(prefix, minTime, maxTime); err != nil {
return err
}
if err := t.tombstoner.Flush(); err != nil {
return err
}
return nil
}
func (t *TSMReader) DeletePrefix(prefix []byte, minTime, maxTime int64,
pred Predicate, dead func([]byte)) error {
// Delete deletes blocks indicated by keys.
func (t *TSMReader) Delete(keys [][]byte) error {
if !t.index.Delete(keys) {
if !t.index.DeletePrefix(prefix, minTime, maxTime, pred, dead) {
return nil
}
if err := t.tombstoner.Add(keys); err != nil {
// TODO(jeff): pass predicate into the tombstoner
if err := t.tombstoner.AddPrefixRange(prefix, minTime, maxTime); err != nil {
return err
}
if err := t.tombstoner.Flush(); err != nil {

View File

@ -25,7 +25,7 @@ type TSMIndex interface {
// DeletePrefix removes keys that begin with the given prefix with data between minTime and
// maxTime from the index. Returns true if there were any changes. It calls dead with any
// keys that became dead as a result of this call.
DeletePrefix(prefix []byte, minTime, maxTime int64, dead func([]byte)) bool
DeletePrefix(prefix []byte, minTime, maxTime int64, pred Predicate, dead func([]byte)) bool
// MaybeContainsKey returns true if the given key may exist in the index. This is faster than
// Contains but, may return false positives.
@ -299,10 +299,10 @@ func (d *indirectIndex) coversEntries(offset uint32, key []byte, buf []TimeRange
// create the merger with the other tombstone entries: the ones for the specific
// key and the one we have proposed to add.
merger := timeRangeMerger{
sorted: d.tombstones[offset],
unsorted: buf,
single: TimeRange{Min: minTime, Max: maxTime},
used: false,
fromMap: d.tombstones[offset],
fromPrefix: buf,
single: TimeRange{Min: minTime, Max: maxTime},
used: false,
}
return buf, timeRangesCoverEntries(merger, entries)
@ -381,7 +381,7 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool
Index: iter.Index(),
Offset: offset,
EntryOffset: entryOffset,
Tombstones: len(d.tombstones[offset]),
Tombstones: len(d.tombstones[offset]) + d.prefixTombstones.Count(key),
})
}
@ -395,10 +395,12 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool
defer d.mu.Unlock()
for _, p := range pending {
// Check the existing tombstones. If the length did not/ change, then we know
key := keys[p.Key]
// Check the existing tombstones. If the length did not change, then we know
// that we don't need to double check coverage, since we only ever increase the
// number of tombstones for a key.
if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs) {
if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs)+d.prefixTombstones.Count(key) {
d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime)
continue
}
@ -421,7 +423,7 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool
continue
}
trbuf, ok = d.coversEntries(p.Offset, keys[p.Key], trbuf, entries, minTime, maxTime)
trbuf, ok = d.coversEntries(p.Offset, key, trbuf, entries, minTime, maxTime)
if ok {
delete(d.tombstones, p.Offset)
iter.SetIndex(p.Index)
@ -443,7 +445,9 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool
// DeletePrefix removes keys that begin with the given prefix with data between minTime and
// maxTime from the index. Returns true if there were any changes. It calls dead with any
// keys that became dead as a result of this call.
func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead func([]byte)) bool {
func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64,
pred Predicate, dead func([]byte)) bool {
if dead == nil {
dead = func([]byte) {}
}
@ -461,6 +465,8 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead
ok bool
trbuf []TimeRange
entries []IndexEntry
pending []pendingTombstone
keys [][]byte
err error
mustTrack bool
)
@ -484,6 +490,11 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead
break
}
// If we have a predicate, skip the key if it doesn't match.
if pred != nil && !pred.Matches(key) {
continue
}
// if we're not doing a partial delete, we don't need to read the entries and
// can just delete the key and move on.
if !partial {
@ -517,7 +528,8 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead
}
// Does adding the minTime and maxTime cover the entries?
trbuf, ok = d.coversEntries(iter.Offset(), iter.Key(&d.b), trbuf, entries, minTime, maxTime)
offset := iter.Offset()
trbuf, ok = d.coversEntries(offset, iter.Key(&d.b), trbuf, entries, minTime, maxTime)
if ok {
dead(key)
iter.Delete()
@ -526,25 +538,94 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64, dead
// Otherwise, we have to track it in the prefix tombstones list.
mustTrack = true
// If we have a predicate, we must keep track of a pending tombstone entry for the key.
if pred != nil {
pending = append(pending, pendingTombstone{
Key: len(keys),
Index: iter.Index(),
Offset: offset,
EntryOffset: entryOffset,
Tombstones: len(d.tombstones[offset]) + d.prefixTombstones.Count(key),
})
keys = append(keys, key)
}
}
d.mu.RUnlock()
// Check and abort if nothing needs to be done.
if !mustTrack && !iter.HasDeletes() {
if !mustTrack && len(pending) == 0 && !iter.HasDeletes() {
return false
}
d.mu.Lock()
defer d.mu.Unlock()
if mustTrack {
d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime})
if pred == nil {
// If we don't have a predicate, we can add a single prefix tombstone entry.
if mustTrack {
d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime})
}
// Clean up any fully deleted keys.
if iter.HasDeletes() {
iter.Done()
}
return true
}
// Otherwise, we must walk the pending deletes individually.
for _, p := range pending {
key := keys[p.Key]
// Check the existing tombstones. If the length did not change, then we know
// that we don't need to double check coverage, since we only ever increase the
// number of tombstones for a key.
if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs)+d.prefixTombstones.Count(key) {
d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime)
continue
}
// Since the length changed, we have to do the expensive overlap check again.
// We re-read the entries again under the write lock because this should be
// rare and only during concurrent deletes to the same key. We could make
// a copy of the entries before getting here, but that penalizes the common
// no-concurrent case.
entries, err = readEntriesTimes(d.b.access(p.EntryOffset, 0), entries)
if err != nil {
// If we have an error reading the entries for a key, we should just pretend
// the whole key is deleted. Maybe a better idea is to report this up somehow
// but that's for another time.
delete(d.tombstones, p.Offset)
iter.SetIndex(p.Index)
if iter.Offset() == p.Offset {
dead(key)
iter.Delete()
}
continue
}
// If it does cover, remove the key entirely.
trbuf, ok = d.coversEntries(p.Offset, key, trbuf, entries, minTime, maxTime)
if ok {
delete(d.tombstones, p.Offset)
iter.SetIndex(p.Index)
if iter.Offset() == p.Offset {
dead(key)
iter.Delete()
}
continue
}
// Append the TimeRange into the tombstones.
trs := d.tombstones[p.Offset]
d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime)
}
// Clean up any fully deleted keys.
if iter.HasDeletes() {
iter.Done()
}
return true
}

View File

@ -1,9 +1,12 @@
package tsm1
import (
"bytes"
"fmt"
"math"
"math/rand"
"reflect"
"sync"
"sync/atomic"
"testing"
)
@ -154,6 +157,8 @@ func TestIndirectIndex_DeleteRange(t *testing.T) {
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), false)
}
// TODO(jeff): predicate tests
func TestIndirectIndex_DeletePrefix(t *testing.T) {
check := func(t *testing.T, got, exp bool) {
t.Helper()
@ -170,7 +175,7 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) {
index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, index)
ind.DeletePrefix([]byte("c"), 5, 15, nil)
ind.DeletePrefix([]byte("c"), 5, 15, nil, nil)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
@ -186,7 +191,7 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) {
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true)
ind.DeletePrefix([]byte("cp"), 0, 5, nil)
ind.DeletePrefix([]byte("cp"), 0, 5, nil, nil)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
@ -202,7 +207,7 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) {
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true)
ind.DeletePrefix([]byte("cpu"), 15, 20, nil)
ind.DeletePrefix([]byte("cpu"), 15, 20, nil, nil)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), false)
@ -231,8 +236,8 @@ func TestIndirectIndex_DeletePrefix_NoMatch(t *testing.T) {
index.Add([]byte("cpu"), BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, index)
ind.DeletePrefix([]byte("b"), 5, 5, nil)
ind.DeletePrefix([]byte("d"), 5, 5, nil)
ind.DeletePrefix([]byte("b"), 5, 5, nil, nil)
ind.DeletePrefix([]byte("d"), 5, 5, nil, nil)
check(t, ind.Contains([]byte("cpu")), true)
check(t, ind.MaybeContainsValue([]byte("cpu"), 5), true)
@ -261,19 +266,71 @@ func TestIndirectIndex_DeletePrefix_Dead(t *testing.T) {
index.Add([]byte("dpu"), BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, index)
ind.DeletePrefix([]byte("b"), 5, 5, dead)
ind.DeletePrefix([]byte("b"), 5, 5, nil, dead)
check(t, keys, b())
ind.DeletePrefix([]byte("c"), 0, 9, dead)
ind.DeletePrefix([]byte("c"), 0, 9, nil, dead)
check(t, keys, b())
ind.DeletePrefix([]byte("c"), 9, 10, dead)
ind.DeletePrefix([]byte("c"), 9, 10, nil, dead)
check(t, keys, b("cpu"))
ind.DeletePrefix([]byte("d"), -50, 50, dead)
ind.DeletePrefix([]byte("d"), -50, 50, nil, dead)
check(t, keys, b("cpu", "dpu"))
}
func TestIndirectIndex_DeletePrefix_Dead_Fuzz(t *testing.T) {
key := bytes.Repeat([]byte("X"), 32)
check := func(t *testing.T, got, exp interface{}) {
t.Helper()
if !reflect.DeepEqual(exp, got) {
t.Fatalf("expected: %v but got: %v", exp, got)
}
}
for i := 0; i < 5000; i++ {
// Create an index with the key in it
writer := NewIndexWriter()
writer.Add(key, BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, writer)
// Keep track if dead is ever called.
happened := uint64(0)
dead := func([]byte) { atomic.AddUint64(&happened, 1) }
// Build up a random set of operations to delete the key.
ops := make([]func(), 9)
for j := range ops {
n := int64(j)
if rand.Intn(2) == 0 {
kn := key[:rand.Intn(len(key))]
ops[j] = func() { ind.DeletePrefix(kn, n, n+1, nil, dead) }
} else {
ops[j] = func() { ind.DeleteRange([][]byte{key}, n, n+1) }
}
}
// Since we will run the ops concurrently, this shuffle is unnecessary
// but it might provide more coverage of random orderings than the
// scheduler randomness alone.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
// Run the operations concurrently. The key should never be dead.
var wg sync.WaitGroup
for _, op := range ops {
op := op
wg.Add(1)
go func() { op(); wg.Done() }()
}
wg.Wait()
check(t, happened, uint64(0))
// Run the last delete operation. It should kill the key.
ind.DeletePrefix(key, 9, 10, nil, dead)
check(t, happened, uint64(1))
}
}
//
// indirectIndex benchmarks
//
@ -549,7 +606,7 @@ func BenchmarkIndirectIndex_DeletePrefixFull(b *testing.B) {
indirect, _ = getIndex(b, name)
b.StartTimer()
indirect.DeletePrefix(prefix, 10, 50, nil)
indirect.DeletePrefix(prefix, 10, 50, nil, nil)
}
if faultBufferEnabled {
@ -574,7 +631,7 @@ func BenchmarkIndirectIndex_DeletePrefixFull_Covered(b *testing.B) {
indirect, _ = getIndex(b, name)
b.StartTimer()
indirect.DeletePrefix(prefix, 0, math.MaxInt64, nil)
indirect.DeletePrefix(prefix, 0, math.MaxInt64, nil, nil)
}
if faultBufferEnabled {

View File

@ -69,6 +69,27 @@ func (p *prefixTree) Search(key []byte, buf []TimeRange) []TimeRange {
return buf
}
func (p *prefixTree) Count(key []byte) int {
count := len(p.values)
if len(key) > 0 {
if ch, ok := p.short[key[0]]; ok {
count += ch.Count(key[1:])
}
}
if len(key) >= prefixTreeKeySize {
var lookup prefixTreeKey
copy(lookup[:], key)
if ch, ok := p.long[lookup]; ok {
count += ch.Count(key[prefixTreeKeySize:])
}
}
return count
}
func (p *prefixTree) checkOverlap(key []byte, ts int64) bool {
for _, t := range p.values {
if t.Min <= ts && ts <= t.Max {

View File

@ -1581,7 +1581,7 @@ func TestTSMReader_DeletePrefix(t *testing.T) {
r, err := NewTSMReader(f)
fatalIfErr(t, "creating reader", err)
err = r.DeletePrefix([]byte("c"), 0, 5, nil)
err = r.DeletePrefix([]byte("c"), 0, 5, nil, nil)
fatalIfErr(t, "deleting prefix", err)
values, err := r.ReadAll([]byte("cpu"))

View File

@ -60,10 +60,10 @@ func timeRangesCoverEntries(merger timeRangeMerger, entries []IndexEntry) (cover
// timeRangeMerger is a special purpose data structure to merge three sources of
// TimeRanges so that we can check if they cover a slice of index entries.
type timeRangeMerger struct {
sorted []TimeRange
unsorted []TimeRange
single TimeRange
used bool // if single has been used
fromMap []TimeRange
fromPrefix []TimeRange
single TimeRange
used bool // if single has been used
}
// Pop returns the next TimeRange in sorted order and a boolean indicating that
@ -72,14 +72,14 @@ func (t *timeRangeMerger) Pop() (out TimeRange, ok bool) {
var where *[]TimeRange
var what []TimeRange
if len(t.sorted) > 0 {
where, what = &t.sorted, t.sorted[1:]
out, ok = t.sorted[0], true
if len(t.fromMap) > 0 {
where, what = &t.fromMap, t.fromMap[1:]
out, ok = t.fromMap[0], true
}
if len(t.unsorted) > 0 && (!ok || t.unsorted[0].Less(out)) {
where, what = &t.unsorted, t.unsorted[1:]
out, ok = t.unsorted[0], true
if len(t.fromPrefix) > 0 && (!ok || t.fromPrefix[0].Less(out)) {
where, what = &t.fromPrefix, t.fromPrefix[1:]
out, ok = t.fromPrefix[0], true
}
if !t.used && (!ok || t.single.Less(out)) {

View File

@ -34,19 +34,19 @@ func TestTimeRangeMerger(t *testing.T) {
}
check(t, ranges(0, 1, 2, 3, 4, 5, 6), timeRangeMerger{
sorted: ranges(0, 2, 6),
unsorted: ranges(1, 3, 5),
single: TimeRange{4, 4},
fromMap: ranges(0, 2, 6),
fromPrefix: ranges(1, 3, 5),
single: TimeRange{4, 4},
})
check(t, ranges(0, 1, 2), timeRangeMerger{
sorted: ranges(0, 1, 2),
used: true,
fromMap: ranges(0, 1, 2),
used: true,
})
check(t, ranges(0, 1, 2), timeRangeMerger{
unsorted: ranges(0, 1, 2),
used: true,
fromPrefix: ranges(0, 1, 2),
used: true,
})
check(t, ranges(0), timeRangeMerger{
@ -54,9 +54,9 @@ func TestTimeRangeMerger(t *testing.T) {
})
check(t, ranges(0, 0, 0), timeRangeMerger{
sorted: ranges(0),
unsorted: ranges(0),
single: TimeRange{0, 0},
fromMap: ranges(0),
fromPrefix: ranges(0),
single: TimeRange{0, 0},
})
}
@ -78,7 +78,7 @@ func TestTimeRangeCoverEntries(t *testing.T) {
check := func(t *testing.T, ranges []TimeRange, entries []IndexEntry, covers bool) {
t.Helper()
sort.Slice(ranges, func(i, j int) bool { return ranges[i].Less(ranges[j]) })
got := timeRangesCoverEntries(timeRangeMerger{sorted: ranges, used: true}, entries)
got := timeRangesCoverEntries(timeRangeMerger{fromMap: ranges, used: true}, entries)
if got != covers {
t.Fatalf("bad covers:\nranges: %v\nentries: %v\ncovers: %v\ngot: %v",
ranges, entries, covers, got)