tsm1: fix staticcheck and refactor closure out

pull/10616/head
Jeff Wendling 2018-12-29 14:54:10 -07:00
parent 1ffcd77342
commit 9cdefa8e4f
2 changed files with 85 additions and 64 deletions

View File

@ -774,10 +774,39 @@ func (d *indirectIndex) Delete(keys [][]byte) {
d.mu.Unlock()
}
// insertTimeRange adds a time range described by the minTime and maxTime into the
// ts, copying the values into the buffer, reallocating the buffer if necessary.
func insertTimeRange(ts, buf []TimeRange, minTime, maxTime int64) []TimeRange {
n := sort.Search(len(ts), func(i int) bool {
if ts[i].Min == minTime {
return ts[i].Max >= maxTime
}
return ts[i].Min > minTime
})
buf = buf[:0]
if cap(buf) < len(ts)+1 {
buf = make([]TimeRange, 0, len(ts)+1)
}
buf = append(buf, ts[:n]...)
buf = append(buf, TimeRange{minTime, maxTime})
buf = append(buf, ts[n:]...)
return buf
}
// tombstoneEntry is a type that describes a pending insertion of a tombstone.
type tombstoneEntry struct {
Index int
KeyOffset uint32
EntryOffset uint32
Ranges []TimeRange
}
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// If we're deleting everything, we won't need to worry about partial deletes.
if minTime == math.MinInt64 && maxTime == math.MaxInt64 {
if minTime <= d.minTime && maxTime >= d.maxTime {
d.Delete(keys)
return
}
@ -788,27 +817,6 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
return
}
// insertTombstone inserts a time range for the minTime and
// maxTime into the sorted list of tombstones. It reuses the
// tsc buffer across calls.
var tsc []TimeRange
insertTombstone := func(ts []TimeRange) []TimeRange {
n := sort.Search(len(ts), func(i int) bool {
if ts[i].Min == minTime {
return ts[i].Max >= maxTime
}
return ts[i].Min > minTime
})
if cap(tsc) < len(ts)+1 {
tsc = make([]TimeRange, 0, len(ts)+1)
}
tsc = append(tsc[:0], ts[:n]...)
tsc = append(tsc, TimeRange{minTime, maxTime})
tsc = append(tsc, ts[n:]...)
return tsc
}
// General outline:
// Under the read lock, determine the set of actions we need to
// take and on what keys to take them. Then, under the write
@ -816,19 +824,9 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// during the read lock to make double checking under the
// write lock cheap.
// tombstones maps the index of the key to the desired list
// of sorted tombstones after the delete. As a special case,
// if the Ranges field is nil, that means that the key
// should be deleted.
type tombstoneEntry struct {
Index int
KeyOffset uint32
EntryOffset uint32
Ranges []TimeRange
}
d.mu.RLock()
iter := d.ro.Iterator()
var trBuf []TimeRange
var entries []IndexEntry
var tombstones []tombstoneEntry
var err error
@ -864,19 +862,19 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// Get the sorted list of tombstones with our new range and check
// to see if they fully cover the key's entries.
ts := insertTombstone(d.tombstones[iter.Offset()])
if timeRangesCoverEntries(ts, entries) {
trBuf = insertTimeRange(d.tombstones[iter.Offset()], trBuf, minTime, maxTime)
if timeRangesCoverEntries(trBuf, entries) {
iter.Delete()
continue
}
// We're adding a tombstone. Store a copy because `insertTombstone` reuses
// the same slice across calls.
// We're adding a tombstone. Store a copy because `insertTimeRange` reuses
// the same buffer across calls to avoid allocations.
tombstones = append(tombstones, tombstoneEntry{
Index: iter.Index(),
KeyOffset: iter.Offset(),
EntryOffset: entryOffset,
Ranges: append([]TimeRange(nil), ts...),
Ranges: append([]TimeRange(nil), trBuf...),
})
}
@ -894,8 +892,8 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// change, then we know that we don't need to double
// check coverage, since we only ever increase the
// number of tombstones for a key.
dts := d.tombstones[tsEntry.KeyOffset]
if len(dts)+1 == len(tsEntry.Ranges) {
trs := d.tombstones[tsEntry.KeyOffset]
if len(trs)+1 == len(tsEntry.Ranges) {
d.tombstones[tsEntry.KeyOffset] = tsEntry.Ranges
continue
}
@ -918,18 +916,19 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
continue
}
ts := insertTombstone(dts)
if timeRangesCoverEntries(ts, entries) {
trBuf = insertTimeRange(trs, trBuf, minTime, maxTime)
if timeRangesCoverEntries(trBuf, entries) {
delete(d.tombstones, tsEntry.KeyOffset)
iter.SetIndex(tsEntry.Index)
if iter.Offset() == tsEntry.KeyOffset {
iter.Delete()
}
} else {
// Store a copy because `insertTombstone` reuses the same slice
// across calls.
d.tombstones[tsEntry.KeyOffset] = append([]TimeRange(nil), ts...)
continue
}
// Store a copy because `insertTombstone` reuses the same slice
// across calls.
d.tombstones[tsEntry.KeyOffset] = append([]TimeRange(nil), trBuf...)
}
iter.Done()
@ -1047,7 +1046,7 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
return fmt.Errorf("indirectIndex: too large to open")
}
var minTime, maxTime int64 = math.MaxInt64, 0
var minTime, maxTime int64 = math.MaxInt64, math.MinInt64
// To create our "indirect" index, we need to find the location of all the keys in
// the raw byte slice. The keys are listed once each (in sorted order). Following

View File

@ -1930,6 +1930,11 @@ func getIndex(tb testing.TB, name string) (*indirectIndex, *indexCacheInfo) {
keys, blocks := sizes[0], sizes[1]
writer := NewIndexWriter()
// add a ballast key that starts at -1 so that we don't trigger optimizations
// when deleting [0, MaxInt]
writer.Add([]byte("ballast"), BlockFloat64, -1, 1, 0, 100)
for i := 0; i < keys; i++ {
key := []byte(fmt.Sprintf("cpu-%08d", i))
info.allKeys = append(info.allKeys, key)
@ -1977,8 +1982,10 @@ func BenchmarkIndirectIndex_Entries(b *testing.B) {
indirect.ReadEntries([]byte("cpu-00000001"), nil)
}
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_ReadEntries(b *testing.B) {
@ -1992,8 +1999,10 @@ func BenchmarkIndirectIndex_ReadEntries(b *testing.B) {
entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries)
}
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkBlockIterator_Next(b *testing.B) {
@ -2008,8 +2017,10 @@ func BenchmarkBlockIterator_Next(b *testing.B) {
}
}
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) {
@ -2024,18 +2035,21 @@ func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) {
indirect.DeleteRange(keys, 10, 50)
}
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, info := getIndex(b, name)
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
@ -2048,8 +2062,10 @@ func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) {
}
}
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
@ -2058,12 +2074,13 @@ func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) {
func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, info := getIndex(b, name)
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
@ -2076,8 +2093,10 @@ func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) {
}
}
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
@ -2086,12 +2105,13 @@ func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) {
func BenchmarkIndirectIndex_Delete(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, info := getIndex(b, name)
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
@ -2104,8 +2124,10 @@ func BenchmarkIndirectIndex_Delete(b *testing.B) {
}
}
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })