tsm1: keep first 8 bytes of each key in memory
Since most keys will share the first 8 bytes, we collapse them into a slice containing partial sums of the counts. We can then binary search into that slice to find the associated prefix for a given offset index. Compressing in this way causes the overhead to be negligable and reduces disk misses by about 30% in these benchmarks (500k series across 100 orgs). name old time/op new time/op delta IndirectIndex_UnmarshalBinary-8 67.5ms ± 1% 64.6ms ± 1% -4.33% (p=0.000 n=8+7) IndirectIndex_Entries-8 9.41µs ± 2% 9.39µs ± 1% ~ (p=0.959 n=8+8) IndirectIndex_ReadEntries-8 5.99µs ± 1% 6.07µs ± 1% +1.29% (p=0.001 n=8+8) IndirectIndex_DeleteRangeLast-8 369ns ± 2% 566ns ± 1% +53.37% (p=0.000 n=8+8) IndirectIndex_DeleteRangeFull-8 368ms ± 9% 369ms ± 2% ~ (p=0.232 n=8+7) IndirectIndex_DeleteRangeFull_Covered-8 600ms ± 1% 618ms ± 0% +3.03% (p=0.000 n=8+7) IndirectIndex_Delete-8 50.0ms ± 1% 47.6ms ± 9% ~ (p=0.463 n=7+8) name old alloc/op new alloc/op delta IndirectIndex_UnmarshalBinary-8 11.6MB ± 0% 11.7MB ± 0% +0.02% (p=0.000 n=8+7) IndirectIndex_Entries-8 32.8kB ± 0% 32.8kB ± 0% ~ (all samples are equal) IndirectIndex_ReadEntries-8 0.00B ±NaN% 0.00B ±NaN% ~ (all samples are equal) IndirectIndex_DeleteRangeLast-8 0.00B ±NaN% 0.00B ±NaN% ~ (all samples are equal) IndirectIndex_DeleteRangeFull-8 162MB ± 0% 162MB ± 0% ~ (p=0.382 n=8+8) IndirectIndex_DeleteRangeFull_Covered-8 82.4MB ± 0% 82.4MB ± 0% ~ (p=0.776 n=8+8) IndirectIndex_Delete-8 4.01kB ± 0% 4.01kB ± 0% ~ (all samples are equal) name old allocs/op new allocs/op delta IndirectIndex_UnmarshalBinary-8 35.0 ± 0% 42.0 ± 0% +20.00% (p=0.000 n=8+8) IndirectIndex_Entries-8 1.00 ± 0% 1.00 ± 0% ~ (all samples are equal) IndirectIndex_ReadEntries-8 0.00 ±NaN% 0.00 ±NaN% ~ (all samples are equal) IndirectIndex_DeleteRangeLast-8 0.00 ±NaN% 0.00 ±NaN% ~ (all samples are equal) IndirectIndex_DeleteRangeFull-8 522k ± 0% 522k ± 0% ~ (p=0.382 n=8+8) IndirectIndex_DeleteRangeFull_Covered-8 3.31k ± 0% 3.31k ± 0% ~ (p=0.457 n=8+8) IndirectIndex_Delete-8 123 ± 0% 123 ± 0% ~ (all samples are equal) name old speed new speed delta IndirectIndex_DeleteRangeFull-8 24.7MB/s ±10% 17.8MB/s ± 2% -28.18% (p=0.000 n=8+7) IndirectIndex_DeleteRangeFull_Covered-8 14.2MB/s ± 1% 9.6MB/s ± 0% -32.30% (p=0.000 n=8+7) IndirectIndex_Delete-8 171MB/s ± 1% 126MB/s ±10% -26.35% (p=0.000 n=7+8) IndirectIndex_DeleteRangeLast went from 17 page faults, or ~180GB/sec at 369ns/op to zero page faults. So even though it got 50% slower, it was actually I/O bound and no longer is.pull/10616/head
parent
0becfc6239
commit
f860305124
|
@ -14,6 +14,7 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/influxdata/platform/pkg/bytesutil"
|
||||
"github.com/influxdata/platform/pkg/file"
|
||||
|
@ -748,6 +749,9 @@ type indirectIndex struct {
|
|||
// key.
|
||||
offsets []uint32
|
||||
|
||||
// 8 byte prefixes of keys to avoid hitting the mmap when searching.
|
||||
prefixes []prefixEntry
|
||||
|
||||
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
|
||||
// file
|
||||
minKey, maxKey []byte
|
||||
|
@ -762,6 +766,25 @@ type indirectIndex struct {
|
|||
tombstones map[uint32][]TimeRange
|
||||
}
|
||||
|
||||
type prefixEntry struct {
|
||||
pre prefix
|
||||
total int // partial sums
|
||||
}
|
||||
|
||||
func searchPrefixesIndex(prefixes []prefixEntry, n int) int {
|
||||
return sort.Search(len(prefixes), func(i int) bool {
|
||||
return prefixes[i].total > n
|
||||
})
|
||||
}
|
||||
|
||||
func searchPrefixes(prefixes []prefixEntry, n int) (prefix, bool) {
|
||||
i := searchPrefixesIndex(prefixes, n)
|
||||
if i < len(prefixes) {
|
||||
return prefixes[i].pre, true
|
||||
}
|
||||
return prefix{}, false
|
||||
}
|
||||
|
||||
// TimeRange holds a min and max timestamp.
|
||||
type TimeRange struct {
|
||||
Min, Max int64
|
||||
|
@ -786,8 +809,16 @@ func (d *indirectIndex) Seek(key []byte) int {
|
|||
|
||||
// searchOffset searches the offsets slice for key and returns the position in
|
||||
// offsets where key would exist.
|
||||
func (d *indirectIndex) searchOffset(key []byte) int {
|
||||
func (d *indirectIndex) searchOffset(key []byte) (index int) {
|
||||
pre := keyPrefix(key)
|
||||
return sort.Search(len(d.offsets), func(i int) bool {
|
||||
if prei, ok := searchPrefixes(d.prefixes, i); ok {
|
||||
if cmp := comparePrefix(prei, pre); cmp == -1 {
|
||||
return false
|
||||
} else if cmp == 1 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
_, k := readKey(d.b.access(d.offsets[i], 0))
|
||||
return bytes.Compare(k, key) >= 0
|
||||
})
|
||||
|
@ -951,7 +982,17 @@ func (d *indirectIndex) Delete(keys [][]byte) {
|
|||
j := d.searchOffset(keys[0])
|
||||
i := j
|
||||
|
||||
pi := searchPrefixesIndex(d.prefixes, j)
|
||||
ptotal := d.prefixes[pi].total
|
||||
psub := 0
|
||||
|
||||
for ; i < len(d.offsets) && len(keys) > 0; i++ {
|
||||
for i >= ptotal {
|
||||
d.prefixes[pi].total -= psub
|
||||
pi++
|
||||
ptotal = d.prefixes[pi].total
|
||||
}
|
||||
|
||||
offset := d.offsets[i]
|
||||
_, indexKey := readKey(d.b.access(offset, 0))
|
||||
|
||||
|
@ -967,6 +1008,7 @@ func (d *indirectIndex) Delete(keys [][]byte) {
|
|||
// continue, deleting the key.
|
||||
if bytes.Equal(keys[0], indexKey) {
|
||||
keys = keys[1:]
|
||||
psub++
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -980,6 +1022,10 @@ func (d *indirectIndex) Delete(keys [][]byte) {
|
|||
if i != j {
|
||||
copy(d.offsets[j:], d.offsets[i:])
|
||||
d.offsets = d.offsets[:len(d.offsets)-(i-j)]
|
||||
|
||||
for ; pi < len(d.prefixes); pi++ {
|
||||
d.prefixes[pi].total -= psub
|
||||
}
|
||||
}
|
||||
|
||||
d.mu.Unlock()
|
||||
|
@ -1173,17 +1219,32 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
|
|||
}
|
||||
|
||||
// Filter the offsets slice removing entries that are in toDelete.
|
||||
var j int
|
||||
j := 0
|
||||
pi := 0
|
||||
ptotal := d.prefixes[pi].total
|
||||
psub := 0
|
||||
|
||||
for i, offset := range d.offsets {
|
||||
for i >= ptotal {
|
||||
d.prefixes[pi].total -= psub
|
||||
pi++
|
||||
ptotal = d.prefixes[pi].total
|
||||
}
|
||||
|
||||
if _, ok := toDelete[offset]; ok {
|
||||
psub++
|
||||
continue
|
||||
}
|
||||
|
||||
if i != j {
|
||||
d.offsets[j] = offset
|
||||
}
|
||||
|
||||
j++
|
||||
}
|
||||
|
||||
d.offsets = d.offsets[:j]
|
||||
d.prefixes[len(d.prefixes)-1].total -= psub
|
||||
}
|
||||
|
||||
// TombstoneRange returns ranges of time that are deleted for the given key.
|
||||
|
@ -1261,6 +1322,33 @@ func (d *indirectIndex) MarshalBinary() ([]byte, error) {
|
|||
return d.b.b, nil
|
||||
}
|
||||
|
||||
type prefix = [8]byte
|
||||
|
||||
// comparePrefix is like bytes.Compare but for a prefix.
|
||||
func comparePrefix(a, b prefix) int {
|
||||
return compare64(binary.BigEndian.Uint64(a[:]), binary.BigEndian.Uint64(b[:]))
|
||||
}
|
||||
|
||||
// compare64 is like bytes.Compare but for uint64s.
|
||||
func compare64(a, b uint64) int {
|
||||
if a == b {
|
||||
return 0
|
||||
} else if a < b {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
// keyPrefix returns a prefix that can be used with compare
|
||||
// to sort the same way the bytes would.
|
||||
func keyPrefix(key []byte) (pre prefix) {
|
||||
if len(key) >= 8 {
|
||||
return *(*[8]byte)(unsafe.Pointer(&key[0]))
|
||||
}
|
||||
copy(pre[:], key)
|
||||
return pre
|
||||
}
|
||||
|
||||
// UnmarshalBinary populates an index from an encoded byte slice
|
||||
// representation of an index.
|
||||
func (d *indirectIndex) UnmarshalBinary(b []byte) error {
|
||||
|
@ -1287,16 +1375,31 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
|
|||
// field.
|
||||
var i uint32
|
||||
var offsets []uint32
|
||||
var pentry prefixEntry
|
||||
var prefixes []prefixEntry
|
||||
|
||||
iMax := uint32(len(b))
|
||||
for i < iMax {
|
||||
offsets = append(offsets, i)
|
||||
|
||||
// Skip to the start of the values
|
||||
// key length value (2) + type (1) + length of key
|
||||
if i+2+indexTypeSize >= iMax {
|
||||
if i+2 >= iMax {
|
||||
return fmt.Errorf("indirectIndex: not enough data for key length value")
|
||||
}
|
||||
i += 1 + indexCountSize + uint32(binary.BigEndian.Uint16(b[i:i+2]))
|
||||
keyLength := uint32(binary.BigEndian.Uint16(b[i : i+2]))
|
||||
i += 2
|
||||
|
||||
if i+keyLength+indexTypeSize >= iMax {
|
||||
return fmt.Errorf("indirectIndex: not enough data for key and type")
|
||||
}
|
||||
pre := keyPrefix(b[i : i+keyLength])
|
||||
if pre != pentry.pre && pentry.total > 0 {
|
||||
prefixes = append(prefixes, pentry)
|
||||
}
|
||||
pentry.total++
|
||||
pentry.pre = pre
|
||||
i += keyLength + indexTypeSize
|
||||
|
||||
// count of index entries
|
||||
if i+indexCountSize >= iMax {
|
||||
|
@ -1331,6 +1434,8 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
|
|||
i += indexEntrySize
|
||||
}
|
||||
|
||||
prefixes = append(prefixes, pentry)
|
||||
|
||||
firstOfs := offsets[0]
|
||||
_, key := readKey(b[firstOfs:])
|
||||
d.minKey = key
|
||||
|
@ -1342,6 +1447,7 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
|
|||
d.minTime = minTime
|
||||
d.maxTime = maxTime
|
||||
d.offsets = offsets
|
||||
d.prefixes = prefixes
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1878,6 +1878,7 @@ const (
|
|||
var (
|
||||
globalIndex *indirectIndex
|
||||
indexOffsets []uint32
|
||||
indexPrefixes []prefixEntry
|
||||
indexAllKeys [][]byte
|
||||
indexBytes []byte
|
||||
)
|
||||
|
@ -1895,6 +1896,7 @@ func resetFaults(indirect *indirectIndex) {
|
|||
func getIndex(tb testing.TB) *indirectIndex {
|
||||
if globalIndex != nil {
|
||||
globalIndex.offsets = append([]uint32(nil), indexOffsets...)
|
||||
globalIndex.prefixes = append([]prefixEntry(nil), indexPrefixes...)
|
||||
globalIndex.tombstones = make(map[uint32][]TimeRange)
|
||||
resetFaults(globalIndex)
|
||||
return globalIndex
|
||||
|
@ -1902,6 +1904,7 @@ func getIndex(tb testing.TB) *indirectIndex {
|
|||
|
||||
globalIndex, indexBytes = mustMakeIndex(tb, indexKeyCount, indexBlockCount)
|
||||
indexOffsets = append([]uint32(nil), globalIndex.offsets...)
|
||||
indexPrefixes = append([]prefixEntry(nil), globalIndex.prefixes...)
|
||||
|
||||
for i := 0; i < indexKeyCount; i++ {
|
||||
indexAllKeys = append(indexAllKeys, []byte(fmt.Sprintf("cpu-%08d", i)))
|
||||
|
|
Loading…
Reference in New Issue