diff --git a/tsdb/tsm1/reader.go b/tsdb/tsm1/reader.go index 6a25430ec0..cd5baee4b6 100644 --- a/tsdb/tsm1/reader.go +++ b/tsdb/tsm1/reader.go @@ -774,10 +774,39 @@ func (d *indirectIndex) Delete(keys [][]byte) { d.mu.Unlock() } +// insertTimeRange adds a time range described by the minTime and maxTime into the +// ts, copying the values into the buffer, reallocating the buffer if necessary. +func insertTimeRange(ts, buf []TimeRange, minTime, maxTime int64) []TimeRange { + n := sort.Search(len(ts), func(i int) bool { + if ts[i].Min == minTime { + return ts[i].Max >= maxTime + } + return ts[i].Min > minTime + }) + + buf = buf[:0] + if cap(buf) < len(ts)+1 { + buf = make([]TimeRange, 0, len(ts)+1) + } + + buf = append(buf, ts[:n]...) + buf = append(buf, TimeRange{minTime, maxTime}) + buf = append(buf, ts[n:]...) + return buf +} + +// tombstoneEntry is a type that describes a pending insertion of a tombstone. +type tombstoneEntry struct { + Index int + KeyOffset uint32 + EntryOffset uint32 + Ranges []TimeRange +} + // DeleteRange removes the given keys with data between minTime and maxTime from the index. func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { // If we're deleting everything, we won't need to worry about partial deletes. - if minTime == math.MinInt64 && maxTime == math.MaxInt64 { + if minTime <= d.minTime && maxTime >= d.maxTime { d.Delete(keys) return } @@ -788,27 +817,6 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { return } - // insertTombstone inserts a time range for the minTime and - // maxTime into the sorted list of tombstones. It reuses the - // tsc buffer across calls. - var tsc []TimeRange - insertTombstone := func(ts []TimeRange) []TimeRange { - n := sort.Search(len(ts), func(i int) bool { - if ts[i].Min == minTime { - return ts[i].Max >= maxTime - } - return ts[i].Min > minTime - }) - - if cap(tsc) < len(ts)+1 { - tsc = make([]TimeRange, 0, len(ts)+1) - } - tsc = append(tsc[:0], ts[:n]...) - tsc = append(tsc, TimeRange{minTime, maxTime}) - tsc = append(tsc, ts[n:]...) - return tsc - } - // General outline: // Under the read lock, determine the set of actions we need to // take and on what keys to take them. Then, under the write @@ -816,19 +824,9 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { // during the read lock to make double checking under the // write lock cheap. - // tombstones maps the index of the key to the desired list - // of sorted tombstones after the delete. As a special case, - // if the Ranges field is nil, that means that the key - // should be deleted. - type tombstoneEntry struct { - Index int - KeyOffset uint32 - EntryOffset uint32 - Ranges []TimeRange - } - d.mu.RLock() iter := d.ro.Iterator() + var trBuf []TimeRange var entries []IndexEntry var tombstones []tombstoneEntry var err error @@ -864,19 +862,19 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { // Get the sorted list of tombstones with our new range and check // to see if they fully cover the key's entries. - ts := insertTombstone(d.tombstones[iter.Offset()]) - if timeRangesCoverEntries(ts, entries) { + trBuf = insertTimeRange(d.tombstones[iter.Offset()], trBuf, minTime, maxTime) + if timeRangesCoverEntries(trBuf, entries) { iter.Delete() continue } - // We're adding a tombstone. Store a copy because `insertTombstone` reuses - // the same slice across calls. + // We're adding a tombstone. Store a copy because `insertTimeRange` reuses + // the same buffer across calls to avoid allocations. tombstones = append(tombstones, tombstoneEntry{ Index: iter.Index(), KeyOffset: iter.Offset(), EntryOffset: entryOffset, - Ranges: append([]TimeRange(nil), ts...), + Ranges: append([]TimeRange(nil), trBuf...), }) } @@ -894,8 +892,8 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { // change, then we know that we don't need to double // check coverage, since we only ever increase the // number of tombstones for a key. - dts := d.tombstones[tsEntry.KeyOffset] - if len(dts)+1 == len(tsEntry.Ranges) { + trs := d.tombstones[tsEntry.KeyOffset] + if len(trs)+1 == len(tsEntry.Ranges) { d.tombstones[tsEntry.KeyOffset] = tsEntry.Ranges continue } @@ -918,18 +916,19 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { continue } - ts := insertTombstone(dts) - if timeRangesCoverEntries(ts, entries) { + trBuf = insertTimeRange(trs, trBuf, minTime, maxTime) + if timeRangesCoverEntries(trBuf, entries) { delete(d.tombstones, tsEntry.KeyOffset) iter.SetIndex(tsEntry.Index) if iter.Offset() == tsEntry.KeyOffset { iter.Delete() } - } else { - // Store a copy because `insertTombstone` reuses the same slice - // across calls. - d.tombstones[tsEntry.KeyOffset] = append([]TimeRange(nil), ts...) + continue } + + // Store a copy because `insertTombstone` reuses the same slice + // across calls. + d.tombstones[tsEntry.KeyOffset] = append([]TimeRange(nil), trBuf...) } iter.Done() @@ -1047,7 +1046,7 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { return fmt.Errorf("indirectIndex: too large to open") } - var minTime, maxTime int64 = math.MaxInt64, 0 + var minTime, maxTime int64 = math.MaxInt64, math.MinInt64 // To create our "indirect" index, we need to find the location of all the keys in // the raw byte slice. The keys are listed once each (in sorted order). Following diff --git a/tsdb/tsm1/reader_test.go b/tsdb/tsm1/reader_test.go index 49f9df6514..382adac688 100644 --- a/tsdb/tsm1/reader_test.go +++ b/tsdb/tsm1/reader_test.go @@ -1930,6 +1930,11 @@ func getIndex(tb testing.TB, name string) (*indirectIndex, *indexCacheInfo) { keys, blocks := sizes[0], sizes[1] writer := NewIndexWriter() + + // add a ballast key that starts at -1 so that we don't trigger optimizations + // when deleting [0, MaxInt] + writer.Add([]byte("ballast"), BlockFloat64, -1, 1, 0, 100) + for i := 0; i < keys; i++ { key := []byte(fmt.Sprintf("cpu-%08d", i)) info.allKeys = append(info.allKeys, key) @@ -1977,8 +1982,10 @@ func BenchmarkIndirectIndex_Entries(b *testing.B) { indirect.ReadEntries([]byte("cpu-00000001"), nil) } - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } } func BenchmarkIndirectIndex_ReadEntries(b *testing.B) { @@ -1992,8 +1999,10 @@ func BenchmarkIndirectIndex_ReadEntries(b *testing.B) { entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries) } - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } } func BenchmarkBlockIterator_Next(b *testing.B) { @@ -2008,8 +2017,10 @@ func BenchmarkBlockIterator_Next(b *testing.B) { } } - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } } func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) { @@ -2024,18 +2035,21 @@ func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) { indirect.DeleteRange(keys, 10, 50) } - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } } func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) { run := func(b *testing.B, name string) { - indirect, info := getIndex(b, name) + indirect, _ := getIndex(b, name) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() + var info *indexCacheInfo indirect, info = getIndex(b, name) b.StartTimer() @@ -2048,8 +2062,10 @@ func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) { } } - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } } b.Run("Large", func(b *testing.B) { run(b, "large") }) @@ -2058,12 +2074,13 @@ func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) { func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) { run := func(b *testing.B, name string) { - indirect, info := getIndex(b, name) + indirect, _ := getIndex(b, name) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() + var info *indexCacheInfo indirect, info = getIndex(b, name) b.StartTimer() @@ -2076,8 +2093,10 @@ func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) { } } - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } } b.Run("Large", func(b *testing.B) { run(b, "large") }) @@ -2086,12 +2105,13 @@ func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) { func BenchmarkIndirectIndex_Delete(b *testing.B) { run := func(b *testing.B, name string) { - indirect, info := getIndex(b, name) + indirect, _ := getIndex(b, name) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() + var info *indexCacheInfo indirect, info = getIndex(b, name) b.StartTimer() @@ -2104,8 +2124,10 @@ func BenchmarkIndirectIndex_Delete(b *testing.B) { } } - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } } b.Run("Large", func(b *testing.B) { run(b, "large") })