tsm1: fix multiple issues with DeleteRange
1. Correctly acquires locks 2. Seeks for discontiguous key ranges (like delete ["aaa", "zzz"]) 3. Is precise about deleting a key when it contains no data name old time/op new time/op delta IndirectIndex_UnmarshalBinary-8 67.3ms ± 1% 63.2ms ±15% ~ (p=0.463 n=7+8) IndirectIndex_Entries-8 9.14µs ± 1% 9.01µs ± 0% -1.40% (p=0.004 n=8+7) IndirectIndex_ReadEntries-8 5.83µs ± 1% 5.68µs ± 2% -2.62% (p=0.000 n=8+8) IndirectIndex_DeleteRangeLast-8 283ns ± 2% 191ns ± 1% -32.37% (p=0.000 n=8+7) IndirectIndex_DeleteRangeFull-8 612ms ± 1% 361ms ± 1% -41.02% (p=0.000 n=8+8) IndirectIndex_Delete-8 49.0ms ± 1% 49.8ms ± 1% +1.80% (p=0.001 n=7+8) name old alloc/op new alloc/op delta IndirectIndex_UnmarshalBinary-8 11.6MB ± 0% 11.6MB ± 0% ~ (all samples are equal) IndirectIndex_Entries-8 32.8kB ± 0% 32.8kB ± 0% ~ (all samples are equal) IndirectIndex_ReadEntries-8 0.00B ±NaN% 0.00B ±NaN% ~ (all samples are equal) IndirectIndex_DeleteRangeLast-8 64.0B ± 0% 0.0B ±NaN% -100.00% (p=0.000 n=8+8) IndirectIndex_DeleteRangeFull-8 168MB ± 0% 162MB ± 0% -3.71% (p=0.000 n=8+8) IndirectIndex_Delete-8 3.94kB ± 0% 3.94kB ± 0% ~ (all samples are equal) name old allocs/op new allocs/op delta IndirectIndex_UnmarshalBinary-8 35.0 ± 0% 35.0 ± 0% ~ (all samples are equal) IndirectIndex_Entries-8 1.00 ± 0% 1.00 ± 0% ~ (all samples are equal) IndirectIndex_ReadEntries-8 0.00 ±NaN% 0.00 ±NaN% ~ (all samples are equal) IndirectIndex_DeleteRangeLast-8 2.00 ± 0% 0.00 ±NaN% -100.00% (p=0.000 n=8+8) IndirectIndex_DeleteRangeFull-8 1.04M ± 0% 0.52M ± 0% -49.77% (p=0.000 n=8+8) IndirectIndex_Delete-8 123 ± 0% 123 ± 0% ~ (all samples are equal)pull/10616/head
parent
aed17cfedd
commit
91e820a9d8
|
@ -4,6 +4,7 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
|
@ -984,17 +985,7 @@ func (d *indirectIndex) Delete(keys [][]byte) {
|
|||
|
||||
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
|
||||
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
|
||||
// No keys, nothing to do
|
||||
if len(keys) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if !bytesutil.IsSorted(keys) {
|
||||
bytesutil.Sort(keys)
|
||||
}
|
||||
|
||||
// If we're deleting the max time range, just use tombstoning to remove the
|
||||
// key from the offsets slice
|
||||
// If we're deleting everything, we won't need to worry about partial deletes.
|
||||
if minTime == math.MinInt64 && maxTime == math.MaxInt64 {
|
||||
d.Delete(keys)
|
||||
return
|
||||
|
@ -1006,36 +997,96 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
|
|||
return
|
||||
}
|
||||
|
||||
fullKeys := make([][]byte, 0, len(keys))
|
||||
tombstones := map[uint32][]TimeRange{}
|
||||
var ie []IndexEntry
|
||||
// insertTombstone inserts a time range for the minTime and
|
||||
// maxTime into the sorted list of tombstones. It reuses the
|
||||
// tsc buffer across calls.
|
||||
var tsc []TimeRange
|
||||
insertTombstone := func(ts []TimeRange) []TimeRange {
|
||||
n := sort.Search(len(ts), func(i int) bool {
|
||||
if ts[i].Min == minTime {
|
||||
return ts[i].Max >= maxTime
|
||||
}
|
||||
return ts[i].Min > minTime
|
||||
})
|
||||
|
||||
for i := d.searchOffset(keys[0]); len(keys) > 0 && i < d.KeyCount(); i++ {
|
||||
offset := d.offsets[i]
|
||||
k, entries := d.readEntriesAt(offset, &ie)
|
||||
if cap(tsc) < len(ts)+1 {
|
||||
tsc = make([]TimeRange, 0, len(ts)+1)
|
||||
}
|
||||
tsc = append(tsc[:0], ts[:n]...)
|
||||
tsc = append(tsc, TimeRange{minTime, maxTime})
|
||||
tsc = append(tsc, ts[n:]...)
|
||||
return tsc
|
||||
}
|
||||
|
||||
// Skip any keys that don't exist. These are less than the current key.
|
||||
for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 {
|
||||
keys = keys[1:]
|
||||
// General outline:
|
||||
// Under the read lock, determine the set of actions we need to
|
||||
// take and on what keys to take them. Then, under the write
|
||||
// lock, perform those actions. We keep track of some state
|
||||
// during the read lock to make double checking under the
|
||||
// write lock cheap.
|
||||
|
||||
// tombstones maps the index of the key to the desired list
|
||||
// of sorted tombstones after the delete. As a special case,
|
||||
// if the Ranges field is nil, that means that the key
|
||||
// should be deleted.
|
||||
type tombstoneEntry struct {
|
||||
KeyOffset uint32
|
||||
EntryOffset uint32
|
||||
Ranges []TimeRange
|
||||
}
|
||||
|
||||
d.mu.RLock()
|
||||
var index = -2 // ensures we do the seek attempt on the first loop through
|
||||
var entries []IndexEntry
|
||||
var tombstones []tombstoneEntry
|
||||
var err error
|
||||
var found bool
|
||||
|
||||
for _, key := range keys {
|
||||
// TODO(jeff): this is a very strange loop. these primitives do not compose well.
|
||||
|
||||
// often, the passed in keys are contiguous. check the next entry to see if it matches
|
||||
// the key to avoid a seek.
|
||||
found, index = false, index+1
|
||||
if index >= 0 && index < len(d.offsets) {
|
||||
goto attempt
|
||||
}
|
||||
|
||||
// No more keys to delete, we're done.
|
||||
if len(keys) == 0 {
|
||||
break
|
||||
seek: // seeks the index to the appropriate key. if not found, continues
|
||||
if index = d.searchOffset(key); index >= len(d.offsets) {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
|
||||
attempt: // loads the key from disk and compares it to the current key
|
||||
keyOffset := d.offsets[index]
|
||||
n, indexKey := readKey(d.b[keyOffset:])
|
||||
entryOffset := keyOffset + n
|
||||
|
||||
// if we haven't done an exact find, check the comparision. if it
|
||||
// doesn't match, go try a seek.
|
||||
if !found && !bytes.Equal(key, indexKey) {
|
||||
goto seek
|
||||
}
|
||||
|
||||
// If the current key is greater than the index one, continue to the next
|
||||
// index key.
|
||||
if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 {
|
||||
// end very strange loop.
|
||||
|
||||
tsEntry := tombstoneEntry{
|
||||
KeyOffset: keyOffset,
|
||||
EntryOffset: entryOffset,
|
||||
}
|
||||
|
||||
// read the entries for the key so that we can determine the time ranges.
|
||||
entries, err = readEntriesTimes(d.b[entryOffset:], entries)
|
||||
if err != nil {
|
||||
// If we have an error reading the entries for a key, we should just pretend
|
||||
// the whole key is deleted. Maybe a better idea is to report this up somehow
|
||||
// but that's for another time.
|
||||
tombstones = append(tombstones, tsEntry)
|
||||
continue
|
||||
}
|
||||
|
||||
// If multiple tombstones are saved for the same key
|
||||
if len(entries) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Is the time range passed outside of the time range we've have stored for this key?
|
||||
// Is the time range passed outside of the time range we have stored for this key?
|
||||
min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
|
||||
if minTime > max || maxTime < min {
|
||||
continue
|
||||
|
@ -1043,81 +1094,94 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
|
|||
|
||||
// Does the range passed in cover every value for the key?
|
||||
if minTime <= min && maxTime >= max {
|
||||
fullKeys = append(fullKeys, keys[0])
|
||||
keys = keys[1:]
|
||||
tombstones = append(tombstones, tsEntry)
|
||||
continue
|
||||
}
|
||||
|
||||
d.mu.RLock()
|
||||
existing := d.tombstones[offset]
|
||||
d.mu.RUnlock()
|
||||
|
||||
// Append the new tombonstes to the existing ones
|
||||
newTs := append(existing, append(tombstones[offset], TimeRange{minTime, maxTime})...)
|
||||
fn := func(i, j int) bool {
|
||||
a, b := newTs[i], newTs[j]
|
||||
if a.Min == b.Min {
|
||||
return a.Max <= b.Max
|
||||
}
|
||||
return a.Min < b.Min
|
||||
}
|
||||
|
||||
// Sort the updated tombstones if necessary
|
||||
if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) {
|
||||
sort.Slice(newTs, fn)
|
||||
}
|
||||
|
||||
tombstones[offset] = newTs
|
||||
|
||||
// We need to see if all the tombstones end up deleting the entire series. This
|
||||
// could happen if their is one tombstore with min,max time spanning all the block
|
||||
// time ranges or from multiple smaller tombstones the delete segments. To detect
|
||||
// this cases, we use a window starting at the first tombstone and grow it be each
|
||||
// tombstone that is immediately adjacent to the current window or if it overlaps.
|
||||
// If there are any gaps, we abort.
|
||||
minTs, maxTs := newTs[0].Min, newTs[0].Max
|
||||
for j := 1; j < len(newTs); j++ {
|
||||
prevTs := newTs[j-1]
|
||||
ts := newTs[j]
|
||||
|
||||
// Make sure all the tombstone line up for a continuous range. We don't
|
||||
// want to have two small deletes on each edges end up causing us to
|
||||
// remove the full key.
|
||||
if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) {
|
||||
minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64)
|
||||
break
|
||||
}
|
||||
|
||||
if ts.Min < minTs {
|
||||
minTs = ts.Min
|
||||
}
|
||||
if ts.Max > maxTs {
|
||||
maxTs = ts.Max
|
||||
}
|
||||
}
|
||||
|
||||
// If we have a fully deleted series, delete it all of it.
|
||||
if minTs <= min && maxTs >= max {
|
||||
fullKeys = append(fullKeys, keys[0])
|
||||
keys = keys[1:]
|
||||
// Get the sorted list of tombstones with our new range and check
|
||||
// to see if they fully cover the key's entries.
|
||||
ts := insertTombstone(d.tombstones[keyOffset])
|
||||
if timeRangesCoverEntries(ts, entries) {
|
||||
tombstones = append(tombstones, tsEntry)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all the keys that fully deleted in bulk
|
||||
if len(fullKeys) > 0 {
|
||||
d.Delete(fullKeys)
|
||||
// We're adding a tombstone. Store a copy because `insertTombstone` reuses
|
||||
// the same slice across calls.
|
||||
tsEntry.Ranges = append([]TimeRange(nil), ts...)
|
||||
tombstones = append(tombstones, tsEntry)
|
||||
}
|
||||
d.mu.RUnlock()
|
||||
|
||||
// Nothing in tombstones means no deletes are going to happen.
|
||||
if len(tombstones) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
for k, v := range tombstones {
|
||||
d.tombstones[k] = v
|
||||
defer d.mu.Unlock()
|
||||
|
||||
// Keep track of which entries will be deleted.
|
||||
toDelete := make(map[uint32]struct{}, len(tombstones))
|
||||
for _, tsEntry := range tombstones {
|
||||
// If the desired list is nil, the key should be fully deleted.
|
||||
if tsEntry.Ranges == nil {
|
||||
delete(d.tombstones, tsEntry.KeyOffset)
|
||||
toDelete[tsEntry.KeyOffset] = struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
// Check the existing tombstones. If the length did not
|
||||
// change, then we know that we don't need to double
|
||||
// check coverage, since we only ever increase the
|
||||
// number of tombstones for a key.
|
||||
dts := d.tombstones[tsEntry.KeyOffset]
|
||||
if len(dts) == len(tsEntry.Ranges)-1 {
|
||||
d.tombstones[tsEntry.KeyOffset] = tsEntry.Ranges
|
||||
continue
|
||||
}
|
||||
|
||||
// Since the length changed, we have to do the expensive overlap check again.
|
||||
// We re-read the entries again under the write lock because this should be
|
||||
// rare and only during concurrent deletes to the same key. We could make
|
||||
// a copy of the entries before getting here, but that penalizes the common
|
||||
// no-concurrent case.
|
||||
entries, err = readEntriesTimes(d.b[tsEntry.EntryOffset:], entries)
|
||||
if err != nil {
|
||||
// If we have an error reading the entries for a key, we should just pretend
|
||||
// the whole key is deleted. Maybe a better idea is to report this up somehow
|
||||
// but that's for another time.
|
||||
delete(d.tombstones, tsEntry.KeyOffset)
|
||||
toDelete[tsEntry.KeyOffset] = struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
ts := insertTombstone(dts)
|
||||
if timeRangesCoverEntries(ts, entries) {
|
||||
delete(d.tombstones, tsEntry.KeyOffset)
|
||||
toDelete[tsEntry.KeyOffset] = struct{}{}
|
||||
} else {
|
||||
d.tombstones[tsEntry.KeyOffset] = append([]TimeRange(nil), ts...)
|
||||
}
|
||||
}
|
||||
d.mu.Unlock()
|
||||
|
||||
// If we have nothing to fully delete, we're done.
|
||||
if len(toDelete) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Filter the offsets slice removing entries that are in toDelete.
|
||||
var j int
|
||||
for i, offset := range d.offsets {
|
||||
if _, ok := toDelete[offset]; ok {
|
||||
continue
|
||||
}
|
||||
if i != j {
|
||||
d.offsets[j] = offset
|
||||
}
|
||||
j++
|
||||
}
|
||||
d.offsets = d.offsets[:j]
|
||||
}
|
||||
|
||||
// TombstoneRange returns ranges of time that are deleted for the given key.
|
||||
|
@ -1632,3 +1696,62 @@ func readEntries(b []byte, entries *indexEntries) (n int, err error) {
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
// readEntriesTimes is a helper function to read entries at the provided buffer but
|
||||
// only reading in the min and max times.
|
||||
func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
|
||||
if len(b) < indexTypeSize+indexCountSize {
|
||||
return nil, errors.New("readEntries: data too short for headers")
|
||||
}
|
||||
|
||||
count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize]))
|
||||
if cap(entries) < count {
|
||||
entries = make([]IndexEntry, count)
|
||||
} else {
|
||||
entries = entries[:count]
|
||||
}
|
||||
b = b[indexTypeSize+indexCountSize:]
|
||||
|
||||
for i := range entries {
|
||||
if len(b) < indexEntrySize {
|
||||
return nil, errors.New("readEntries: stream too short for entry")
|
||||
}
|
||||
entries[i].MinTime = int64(binary.BigEndian.Uint64(b[0:8]))
|
||||
entries[i].MaxTime = int64(binary.BigEndian.Uint64(b[8:16]))
|
||||
b = b[indexEntrySize:]
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// timeRangesCoverEntries returns true if the time ranges fully cover the entries.
|
||||
func timeRangesCoverEntries(ts []TimeRange, entries []IndexEntry) (covers bool) {
|
||||
mustCover := entries[0].MinTime
|
||||
for len(entries) > 0 && len(ts) > 0 {
|
||||
switch {
|
||||
// If the tombstone does not include mustCover, we
|
||||
// know we do not fully cover every entry.
|
||||
case ts[0].Min > mustCover:
|
||||
return false
|
||||
|
||||
// Otherwise, if the tombstone covers the rest of
|
||||
// the entry, consume it and bump mustCover to the
|
||||
// start of the next entry.
|
||||
case ts[0].Max >= entries[0].MaxTime:
|
||||
entries = entries[1:]
|
||||
if len(entries) > 0 {
|
||||
mustCover = entries[0].MinTime
|
||||
}
|
||||
|
||||
// Otherwise, we're still inside of an entry, and
|
||||
// so the tombstone must adjoin the current tombstone.
|
||||
default:
|
||||
if ts[0].Max >= mustCover {
|
||||
mustCover = ts[0].Max + 1
|
||||
}
|
||||
ts = ts[1:]
|
||||
}
|
||||
}
|
||||
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
|
|
@ -1995,6 +1995,25 @@ func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
indirect := getIndex(b)
|
||||
b.StartTimer()
|
||||
|
||||
for i := 0; i < len(indexAllKeys); i += 4096 {
|
||||
n := i + 4096
|
||||
if n > len(indexAllKeys) {
|
||||
n = len(indexAllKeys)
|
||||
}
|
||||
indirect.DeleteRange(indexAllKeys[i:n], 0, math.MaxInt64)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkIndirectIndex_Delete(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
|
Loading…
Reference in New Issue