tsm1: introduce readerOffsets to manage the offsets slice

It exposes an API that will clean up the bodies of many methods and
provide a safe abstraction around iteration that will be able to
handle reads with concurrent deletes.

Benchmarks are flat.
pull/10616/head
Jeff Wendling 2018-12-18 19:20:08 -07:00
parent f860305124
commit 6f5c94f3f7
4 changed files with 469 additions and 103 deletions

View File

@ -14,7 +14,6 @@ import (
"sort"
"sync"
"sync/atomic"
"unsafe"
"github.com/influxdata/platform/pkg/bytesutil"
"github.com/influxdata/platform/pkg/file"
@ -745,12 +744,9 @@ type indirectIndex struct {
// slice reference
b faultBuffer
// offsets contains the positions in b for each key. It points to the 2 byte length of
// key.
offsets []uint32
// 8 byte prefixes of keys to avoid hitting the mmap when searching.
prefixes []prefixEntry
// ro contains the positions in b for each key as well as the first bytes of each key
// to avoid disk seeks.
ro readerOffsets
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
// file
@ -766,25 +762,6 @@ type indirectIndex struct {
tombstones map[uint32][]TimeRange
}
type prefixEntry struct {
pre prefix
total int // partial sums
}
func searchPrefixesIndex(prefixes []prefixEntry, n int) int {
return sort.Search(len(prefixes), func(i int) bool {
return prefixes[i].total > n
})
}
func searchPrefixes(prefixes []prefixEntry, n int) (prefix, bool) {
i := searchPrefixesIndex(prefixes, n)
if i < len(prefixes) {
return prefixes[i].pre, true
}
return prefix{}, false
}
// TimeRange holds a min and max timestamp.
type TimeRange struct {
Min, Max int64
@ -807,19 +784,33 @@ func (d *indirectIndex) Seek(key []byte) int {
return d.searchOffset(key)
}
func searchPrefixesIndex(prefixes []prefixEntry, n int) int {
return sort.Search(len(prefixes), func(i int) bool {
return prefixes[i].total > n
})
}
func searchPrefixes(prefixes []prefixEntry, n int) (prefix, bool) {
i := searchPrefixesIndex(prefixes, n)
if i < len(prefixes) {
return prefixes[i].pre, true
}
return prefix{}, false
}
// searchOffset searches the offsets slice for key and returns the position in
// offsets where key would exist.
func (d *indirectIndex) searchOffset(key []byte) (index int) {
pre := keyPrefix(key)
return sort.Search(len(d.offsets), func(i int) bool {
if prei, ok := searchPrefixes(d.prefixes, i); ok {
return sort.Search(len(d.ro.offsets), func(i int) bool {
if prei, ok := searchPrefixes(d.ro.prefixes, i); ok {
if cmp := comparePrefix(prei, pre); cmp == -1 {
return false
} else if cmp == 1 {
return true
}
}
_, k := readKey(d.b.access(d.offsets[i], 0))
_, k := readKey(d.b.access(d.ro.offsets[i], 0))
return bytes.Compare(k, key) >= 0
})
}
@ -833,8 +824,8 @@ func (d *indirectIndex) search(key []byte) uint32 {
// We use a binary search across our indirect offsets (pointers to all the keys
// in the index slice). We then check if we have found the right index.
if i := d.searchOffset(key); i < len(d.offsets) {
offset := d.offsets[i]
if i := d.searchOffset(key); i < len(d.ro.offsets) {
offset := d.ro.offsets[i]
_, k := readKey(d.b.access(offset, 0))
// The search may have returned an i == 0 which could indicated that the value
@ -919,11 +910,11 @@ func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []Ind
d.mu.RLock()
defer d.mu.RUnlock()
if idx < 0 || idx >= len(d.offsets) {
if idx < 0 || idx >= len(d.ro.offsets) {
return nil, 0, nil
}
offset := d.offsets[idx]
offset := d.ro.offsets[idx]
n, key := readKey(d.b.access(offset, 0))
typ := d.b.access(offset+n, 1)[0]
@ -945,11 +936,11 @@ func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []Ind
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
d.mu.RLock()
if idx < 0 || idx >= len(d.offsets) {
if idx < 0 || idx >= len(d.ro.offsets) {
d.mu.RUnlock()
return nil, 0
}
offset := d.offsets[idx]
offset := d.ro.offsets[idx]
n, key := readKey(d.b.access(offset, 0))
offset = offset + uint32(n)
typ := d.b.access(offset, 1)[0]
@ -960,7 +951,7 @@ func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
// KeyCount returns the count of unique keys in the index.
func (d *indirectIndex) KeyCount() int {
d.mu.RLock()
n := len(d.offsets)
n := len(d.ro.offsets)
d.mu.RUnlock()
return n
}
@ -982,18 +973,18 @@ func (d *indirectIndex) Delete(keys [][]byte) {
j := d.searchOffset(keys[0])
i := j
pi := searchPrefixesIndex(d.prefixes, j)
ptotal := d.prefixes[pi].total
pi := searchPrefixesIndex(d.ro.prefixes, j)
ptotal := d.ro.prefixes[pi].total
psub := 0
for ; i < len(d.offsets) && len(keys) > 0; i++ {
for ; i < len(d.ro.offsets) && len(keys) > 0; i++ {
for i >= ptotal {
d.prefixes[pi].total -= psub
d.ro.prefixes[pi].total -= psub
pi++
ptotal = d.prefixes[pi].total
ptotal = d.ro.prefixes[pi].total
}
offset := d.offsets[i]
offset := d.ro.offsets[i]
_, indexKey := readKey(d.b.access(offset, 0))
// pop keys to delete while they're smaller than the indexKey.
@ -1013,18 +1004,18 @@ func (d *indirectIndex) Delete(keys [][]byte) {
}
if i != j {
d.offsets[j] = d.offsets[i]
d.ro.offsets[j] = d.ro.offsets[i]
}
j++
}
// if we deleted any keys, copy the suffix and reslice.
if i != j {
copy(d.offsets[j:], d.offsets[i:])
d.offsets = d.offsets[:len(d.offsets)-(i-j)]
copy(d.ro.offsets[j:], d.ro.offsets[i:])
d.ro.offsets = d.ro.offsets[:len(d.ro.offsets)-(i-j)]
for ; pi < len(d.prefixes); pi++ {
d.prefixes[pi].total -= psub
for ; pi < len(d.ro.prefixes); pi++ {
d.ro.prefixes[pi].total -= psub
}
}
@ -1096,18 +1087,18 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// often, the passed in keys are contiguous. check the next entry to see if it matches
// the key to avoid a seek.
found, index = false, index+1
if index >= 0 && index < len(d.offsets) {
if index >= 0 && index < len(d.ro.offsets) {
goto attempt
}
seek: // seeks the index to the appropriate key. if not found, continues
if index = d.searchOffset(key); index >= len(d.offsets) {
if index = d.searchOffset(key); index >= len(d.ro.offsets) {
continue
}
found = true
attempt: // loads the key from disk and compares it to the current key
keyOffset := d.offsets[index]
keyOffset := d.ro.offsets[index]
n, indexKey := readKey(d.b.access(keyOffset, 0))
entryOffset := keyOffset + n
@ -1221,14 +1212,14 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// Filter the offsets slice removing entries that are in toDelete.
j := 0
pi := 0
ptotal := d.prefixes[pi].total
ptotal := d.ro.prefixes[pi].total
psub := 0
for i, offset := range d.offsets {
for i, offset := range d.ro.offsets {
for i >= ptotal {
d.prefixes[pi].total -= psub
d.ro.prefixes[pi].total -= psub
pi++
ptotal = d.prefixes[pi].total
ptotal = d.ro.prefixes[pi].total
}
if _, ok := toDelete[offset]; ok {
@ -1237,14 +1228,14 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
}
if i != j {
d.offsets[j] = offset
d.ro.offsets[j] = offset
}
j++
}
d.offsets = d.offsets[:j]
d.prefixes[len(d.prefixes)-1].total -= psub
d.ro.offsets = d.ro.offsets[:j]
d.ro.prefixes[len(d.ro.prefixes)-1].total -= psub
}
// TombstoneRange returns ranges of time that are deleted for the given key.
@ -1322,33 +1313,6 @@ func (d *indirectIndex) MarshalBinary() ([]byte, error) {
return d.b.b, nil
}
type prefix = [8]byte
// comparePrefix is like bytes.Compare but for a prefix.
func comparePrefix(a, b prefix) int {
return compare64(binary.BigEndian.Uint64(a[:]), binary.BigEndian.Uint64(b[:]))
}
// compare64 is like bytes.Compare but for uint64s.
func compare64(a, b uint64) int {
if a == b {
return 0
} else if a < b {
return -1
}
return 1
}
// keyPrefix returns a prefix that can be used with compare
// to sort the same way the bytes would.
func keyPrefix(key []byte) (pre prefix) {
if len(key) >= 8 {
return *(*[8]byte)(unsafe.Pointer(&key[0]))
}
copy(pre[:], key)
return pre
}
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
func (d *indirectIndex) UnmarshalBinary(b []byte) error {
@ -1374,13 +1338,11 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
// basically skips across the slice keeping track of the counter when we are at a key
// field.
var i uint32
var offsets []uint32
var pentry prefixEntry
var prefixes []prefixEntry
var ro readerOffsets
iMax := uint32(len(b))
for i < iMax {
offsets = append(offsets, i)
offset := i // save for when we add to the data structure
// Skip to the start of the values
// key length value (2) + type (1) + length of key
@ -1393,12 +1355,7 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
if i+keyLength+indexTypeSize >= iMax {
return fmt.Errorf("indirectIndex: not enough data for key and type")
}
pre := keyPrefix(b[i : i+keyLength])
if pre != pentry.pre && pentry.total > 0 {
prefixes = append(prefixes, pentry)
}
pentry.total++
pentry.pre = pre
ro.AddKey(offset, b[i:i+keyLength])
i += keyLength + indexTypeSize
// count of index entries
@ -1434,20 +1391,19 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
i += indexEntrySize
}
prefixes = append(prefixes, pentry)
ro.Done()
firstOfs := offsets[0]
firstOfs := ro.offsets[0]
_, key := readKey(b[firstOfs:])
d.minKey = key
lastOfs := offsets[len(offsets)-1]
lastOfs := ro.offsets[len(ro.offsets)-1]
_, key = readKey(b[lastOfs:])
d.maxKey = key
d.minTime = minTime
d.maxTime = maxTime
d.offsets = offsets
d.prefixes = prefixes
d.ro = ro
return nil
}

243
tsdb/tsm1/reader_offsets.go Normal file
View File

@ -0,0 +1,243 @@
package tsm1
import (
"bytes"
"encoding/binary"
"unsafe"
)
// readerOffsets keeps track of offsets of keys for an indirectIndex.
type readerOffsets struct {
offsets []uint32
prefixes []prefixEntry
entry prefixEntry
}
// prefixEntry keeps a prefix along with a prefix sum of the total number of
// keys with the given prefix.
type prefixEntry struct {
pre prefix
total int // partial sums
}
// prefix is an 8 byte prefix of a key that sorts the same way the key does.
type prefix [8]byte
// comparePrefix is like bytes.Compare but for a prefix.
func comparePrefix(a, b prefix) int {
au, bu := binary.BigEndian.Uint64(a[:]), binary.BigEndian.Uint64(b[:])
if au == bu {
return 0
} else if au < bu {
return -1
}
return 1
}
// keyPrefix returns a prefix that can be used with compare
// to sort the same way the bytes would.
func keyPrefix(key []byte) (pre prefix) {
if len(key) >= 8 {
return *(*[8]byte)(unsafe.Pointer(&key[0]))
}
copy(pre[:], key)
return pre
}
// searchPrefix returns the index of the prefixEntry for the nth offset.
func (r *readerOffsets) searchPrefix(n int) int {
i, j := 0, len(r.prefixes)
for i < j {
h := int(uint(i+j) >> 1)
if n >= r.prefixes[h].total {
i = h + 1
} else {
j = h
}
}
return i
}
// addKey tracks the key in the readerOffsets at the given offset.
func (r *readerOffsets) AddKey(offset uint32, key []byte) {
r.offsets = append(r.offsets, offset)
pre := keyPrefix(key)
if r.entry.pre != pre && r.entry.total != 0 {
r.prefixes = append(r.prefixes, r.entry)
}
r.entry.pre = pre
r.entry.total++
}
// done signals that we are done adding keys.
func (r *readerOffsets) Done() {
r.prefixes = append(r.prefixes, r.entry)
}
// Iterator returns an iterator that can walk and seek around the keys cheaply.
func (r *readerOffsets) Iterator() readerOffsetsIterator {
return readerOffsetsIterator{r: r, first: true}
}
// readerOffsetsIterator iterates over the keys in readerOffsets.
type readerOffsetsIterator struct {
r *readerOffsets
first bool // is this the first call to next?
del bool // should the next call delete?
oi int // index into offsets
od int // undeleted index into offsets
pi int // index into prefixes
psub int // number of deleted entries
ks rOIKeyState // current key state
}
// rOIKeyState keeps track of cached information for the current key.
type rOIKeyState struct {
length uint32 // high bit means set
key []byte
}
// newROIKeyState constructs a rOIKeyState for the given key, precaching it.
func newROIKeyState(key []byte) (ks rOIKeyState) {
if key != nil {
ks.length = uint32(len(key)) | 1<<31
ks.key = key
}
return ks
}
// setIndex sets the reader to the given index and clears any cached state.
func (ri *readerOffsetsIterator) setIndex(i, pi int, key []byte) {
ri.oi, ri.od, ri.pi, ri.ks = i, i, pi, newROIKeyState(key)
}
// Length returns the length of the current pointed at key.
func (ri *readerOffsetsIterator) Length(b *faultBuffer) uint16 {
if ri.ks.length&1<<31 == 0 {
ri.ks.length = uint32(binary.BigEndian.Uint16(b.access(ri.Offset(), 2))) | 1<<31
}
return uint16(ri.ks.length)
}
// Key returns the current pointed at key.
func (ri *readerOffsetsIterator) Key(b *faultBuffer) []byte {
if ri.ks.key == nil {
ri.ks.key = b.access(ri.KeyOffset(), uint32(ri.Length(b)))
}
return ri.ks.key
}
// KeyOffset returns the offset of the current pointed at the key.
func (ri *readerOffsetsIterator) KeyOffset() uint32 {
return ri.Offset() + 2
}
// EntryOffset returns the offset of the current pointed at entries (including type byte).
func (ri *readerOffsetsIterator) EntryOffset(b *faultBuffer) uint32 {
return ri.Offset() + 2 + uint32(ri.Length(b))
}
// prefix returns the current pointed at prefix.
func (ri *readerOffsetsIterator) Prefix() prefix { return ri.r.prefixes[ri.pi].pre }
// offset returns the current pointed at offset.
func (ri *readerOffsetsIterator) Offset() uint32 { return ri.r.offsets[ri.oi] }
// Next advances the iterator and returns true if it points at a value.
func (ri *readerOffsetsIterator) Next() bool {
if ri.oi >= len(ri.r.offsets) {
return false
} else if ri.first {
ri.first = false
return true
}
if ri.del {
ri.psub++
ri.del = false
} else {
if ri.oi != ri.od {
ri.r.offsets[ri.od] = ri.r.offsets[ri.oi]
}
ri.od++
}
ri.oi++
ri.ks = rOIKeyState{}
for ri.pi < len(ri.r.prefixes) && ri.oi >= ri.r.prefixes[ri.pi].total {
ri.r.prefixes[ri.pi].total -= ri.psub
ri.pi++
}
return ri.oi < len(ri.r.offsets)
}
// Done should be called to finalize up any deletes.
func (ri *readerOffsetsIterator) Done() {
if ri.del { // if flagged to delete, pretend next was called
ri.psub++
ri.oi++
}
if ri.oi != ri.od {
for ; ri.pi < len(ri.r.prefixes); ri.pi++ {
ri.r.prefixes[ri.pi].total -= ri.psub
}
copy(ri.r.offsets[ri.od:], ri.r.offsets[ri.oi:])
ri.r.offsets = ri.r.offsets[:len(ri.r.offsets)-(ri.oi-ri.od)]
}
}
// Delete flags the entry to be skipped on the next call to Next.
// Should only be called with the write lock.
func (ri *readerOffsetsIterator) Delete() { ri.del = true }
// Seek points the iterator at the smallest key greater than or equal to the
// given key, returning true if it was an exact match. It returns false for
// ok if the key does not exist. Do not call Seek if you have ever called Delete.
func (ri *readerOffsetsIterator) Seek(key []byte, b *faultBuffer) (exact, ok bool) {
if ri.del || ri.psub > 0 {
panic("does not support Seek when we just deleted a key")
}
ri.first = false
pre, i, j, pi := keyPrefix(key), 0, len(ri.r.offsets), 0
for i < j {
h := int(uint(i+j) >> 1)
pi = ri.r.searchPrefix(h)
ri.setIndex(h, pi, nil)
switch ri.Compare(key, pre, b) {
case -1:
i = h + 1
case 1:
j = h
default:
return true, true
}
}
ri.setIndex(i, pi, nil)
if ri.oi >= len(ri.r.offsets) {
return false, false
}
for ri.pi < len(ri.r.prefixes) && ri.oi >= ri.r.prefixes[ri.pi].total {
ri.pi++
}
return bytes.Equal(ri.Key(b), key), true
}
// SeekTo sets the iterator to point at the nth element.
func (ri *readerOffsetsIterator) SeekTo(n int) { ri.setIndex(n, ri.r.searchPrefix(n), nil) }
// Compare is like bytes.Compare with the pointed at key, but reduces the amount of
// faults.
func (ri *readerOffsetsIterator) Compare(key []byte, pre prefix, b *faultBuffer) int {
if cmp := comparePrefix(ri.Prefix(), pre); cmp != 0 {
return cmp
}
return bytes.Compare(ri.Key(b), key)
}

View File

@ -0,0 +1,167 @@
package tsm1
import (
"fmt"
"math/rand"
"testing"
)
func TestReaderOffsets(t *testing.T) {
const numKeys = 100
check := func(t *testing.T, what string, got, exp interface{}, extra ...interface{}) {
t.Helper()
if got != exp {
args := []interface{}{"incorrect", what, "got:", got, "exp:", exp}
args = append(args, extra...)
t.Fatal(args...)
}
}
makeKey := func(i int) string { return fmt.Sprintf("%09d", i) }
makeRO := func() (readerOffsets, *faultBuffer) {
var buf []byte
var ro readerOffsets
for i := 0; i < numKeys; i++ {
ro.AddKey(addKey(&buf, makeKey(i)))
}
ro.Done()
return ro, &faultBuffer{b: buf}
}
t.Run("Create_SingleKey", func(t *testing.T) {
var buf []byte
var ro readerOffsets
ro.AddKey(addKey(&buf, makeKey(0)))
ro.Done()
check(t, "offsets", len(ro.offsets), 1)
check(t, "prefixes", len(ro.prefixes), 1)
})
t.Run("Create", func(t *testing.T) {
ro, _ := makeRO()
check(t, "offsets", len(ro.offsets), numKeys)
check(t, "prefixes", len(ro.prefixes), numKeys/10)
})
t.Run("Iterate", func(t *testing.T) {
ro, fb := makeRO()
iter := ro.Iterator()
for i := 0; iter.Next(); i++ {
check(t, "key", string(iter.Key(fb)), makeKey(i))
}
})
t.Run("Seek", func(t *testing.T) {
ro, fb := makeRO()
exact, ok := false, false
iter := ro.Iterator()
for i := 0; i < numKeys-1; i++ {
exact, ok = iter.Seek([]byte(makeKey(i)), fb)
check(t, "exact", exact, true)
check(t, "ok", ok, true)
check(t, "key", string(iter.Key(fb)), makeKey(i))
exact, ok = iter.Seek([]byte(makeKey(i)+"0"), fb)
check(t, "exact", exact, false)
check(t, "ok", ok, true)
check(t, "key", string(iter.Key(fb)), makeKey(i+1))
}
exact, ok = iter.Seek([]byte(makeKey(numKeys-1)), fb)
check(t, "exact", exact, true)
check(t, "ok", ok, true)
check(t, "key", string(iter.Key(fb)), makeKey(numKeys-1))
exact, ok = iter.Seek([]byte(makeKey(numKeys-1)+"0"), fb)
check(t, "exact", exact, false)
check(t, "ok", ok, false)
exact, ok = iter.Seek([]byte("1"), fb)
check(t, "exact", exact, false)
check(t, "ok", ok, false)
exact, ok = iter.Seek(nil, fb)
check(t, "exact", exact, false)
check(t, "ok", ok, true)
check(t, "key", string(iter.Key(fb)), makeKey(0))
})
t.Run("Delete", func(t *testing.T) {
ro, fb := makeRO()
iter := ro.Iterator()
for i := 0; iter.Next(); i++ {
if i%2 == 0 {
continue
}
iter.Delete()
}
iter.Done()
iter = ro.Iterator()
for i := 0; iter.Next(); i++ {
check(t, "key", string(iter.Key(fb)), makeKey(2*i))
}
})
t.Run("Fuzz", func(t *testing.T) {
for i := 0; i < 100; i++ {
ro, fb := makeRO()
deleted := make(map[string]struct{})
for i := 0; i < numKeys; i++ {
// delete a random key. if we seek past, delete the first key.
{
iter := ro.Iterator()
_, ok := iter.Seek([]byte(makeKey(rand.Intn(numKeys))), fb)
if !ok {
iter.Seek(nil, fb)
}
key := string(iter.Key(fb))
_, ok = deleted[key]
check(t, "key deleted", ok, false)
deleted[key] = struct{}{}
iter.Delete()
iter.Next()
iter.Done()
}
// seek to every key that isn't deleted.
for i := 0; i < numKeys; i++ {
key := makeKey(i)
if _, ok := deleted[key]; ok {
continue
}
iter := ro.Iterator()
exact, ok := iter.Seek([]byte(key), fb)
check(t, "exact", exact, true)
check(t, "ok", ok, true)
check(t, "key", string(iter.Key(fb)), key)
}
}
check(t, "amount deleted", len(deleted), numKeys)
iter := ro.Iterator()
check(t, "next", iter.Next(), false)
}
})
}
func addKey(buf *[]byte, key string) (uint32, []byte) {
offset := len(*buf)
*buf = append(*buf, byte(len(key)>>8), byte(len(key)))
*buf = append(*buf, key...)
*buf = append(*buf, 0)
*buf = append(*buf, make([]byte, indexEntrySize)...)
return uint32(offset), []byte(key)
}

View File

@ -1895,16 +1895,16 @@ func resetFaults(indirect *indirectIndex) {
func getIndex(tb testing.TB) *indirectIndex {
if globalIndex != nil {
globalIndex.offsets = append([]uint32(nil), indexOffsets...)
globalIndex.prefixes = append([]prefixEntry(nil), indexPrefixes...)
globalIndex.ro.offsets = append([]uint32(nil), indexOffsets...)
globalIndex.ro.prefixes = append([]prefixEntry(nil), indexPrefixes...)
globalIndex.tombstones = make(map[uint32][]TimeRange)
resetFaults(globalIndex)
return globalIndex
}
globalIndex, indexBytes = mustMakeIndex(tb, indexKeyCount, indexBlockCount)
indexOffsets = append([]uint32(nil), globalIndex.offsets...)
indexPrefixes = append([]prefixEntry(nil), globalIndex.prefixes...)
indexOffsets = append([]uint32(nil), globalIndex.ro.offsets...)
indexPrefixes = append([]prefixEntry(nil), globalIndex.ro.prefixes...)
for i := 0; i < indexKeyCount; i++ {
indexAllKeys = append(indexAllKeys, []byte(fmt.Sprintf("cpu-%08d", i)))