Use offheap memory for indirect index offsets slice
parent
91eb9de341
commit
d3e832b462
|
@ -2,6 +2,7 @@ package bytesutil
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
)
|
||||
|
||||
|
@ -18,6 +19,28 @@ func SearchBytes(a [][]byte, x []byte) int {
|
|||
return sort.Search(len(a), func(i int) bool { return bytes.Compare(a[i], x) >= 0 })
|
||||
}
|
||||
|
||||
// SearchBytesFixed searches a for x using a binary search. The size of a must be a multiple of
|
||||
// of x or else the function panics. There returned value is the index within a where x should
|
||||
// exist. The caller should ensure that x does exist at this index.
|
||||
func SearchBytesFixed(a []byte, sz int, fn func(x []byte) bool) int {
|
||||
if len(a)%sz != 0 {
|
||||
panic(fmt.Sprintf("x is not a multiple of a: %d %d", len(a), sz))
|
||||
}
|
||||
|
||||
i, j := 0, len(a)-sz
|
||||
for i < j {
|
||||
h := int(uint(i+j) >> 1)
|
||||
h -= h % sz
|
||||
if !fn(a[h : h+sz]) {
|
||||
i = h + sz
|
||||
} else {
|
||||
j = h
|
||||
}
|
||||
}
|
||||
|
||||
return i
|
||||
}
|
||||
|
||||
// Union returns the union of a & b in sorted order.
|
||||
func Union(a, b [][]byte) [][]byte {
|
||||
n := len(b)
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package bytesutil_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
)
|
||||
|
||||
func TestSearchBytesFixed(t *testing.T) {
|
||||
n, sz := 5, 8
|
||||
a := make([]byte, n*sz) // 5 - 8 byte int64s
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
binary.BigEndian.PutUint64(a[i*sz:i*sz+sz], uint64(i))
|
||||
}
|
||||
|
||||
var x [8]byte
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
binary.BigEndian.PutUint64(x[:], uint64(i))
|
||||
if exp, got := i*sz, bytesutil.SearchBytesFixed(a, len(x), func(v []byte) bool {
|
||||
return bytes.Compare(v, x[:]) >= 0
|
||||
}); exp != got {
|
||||
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
if exp, got := len(a)-1, bytesutil.SearchBytesFixed(a, 1, func(v []byte) bool {
|
||||
return bytes.Compare(v, []byte{99}) >= 0
|
||||
}); exp != got {
|
||||
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
|
@ -162,6 +162,7 @@ func TestCompactor_CompactFull(t *testing.T) {
|
|||
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
|
||||
}
|
||||
|
||||
println("Open", files[0])
|
||||
r := MustOpenTSMReader(files[0])
|
||||
|
||||
if got, exp := r.KeyCount(), 3; got != exp {
|
||||
|
|
|
@ -8,12 +8,12 @@ import (
|
|||
)
|
||||
|
||||
func mmap(f *os.File, offset int64, length int) ([]byte, error) {
|
||||
mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// anonymous mapping
|
||||
if f == nil {
|
||||
return syscall.Mmap(-1, 0, length, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
|
||||
}
|
||||
|
||||
return mmap, nil
|
||||
return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
|
||||
}
|
||||
|
||||
func munmap(b []byte) (err error) {
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -102,6 +101,9 @@ type TSMIndex interface {
|
|||
// UnmarshalBinary populates an index from an encoded byte slice
|
||||
// representation of an index.
|
||||
UnmarshalBinary(b []byte) error
|
||||
|
||||
// Close closes the index and releases any resources.
|
||||
Close() error
|
||||
}
|
||||
|
||||
// BlockIterator allows iterating over each block in a TSM file in order. It provides
|
||||
|
@ -353,7 +355,7 @@ func (t *TSMReader) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return t.index.Close()
|
||||
}
|
||||
|
||||
// Ref records a usage of this TSMReader. If there are active references
|
||||
|
@ -598,7 +600,7 @@ type indirectIndex struct {
|
|||
|
||||
// offsets contains the positions in b for each key. It points to the 2 byte length of
|
||||
// key.
|
||||
offsets []int32
|
||||
offsets []byte
|
||||
|
||||
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
|
||||
// file
|
||||
|
@ -631,9 +633,10 @@ func NewIndirectIndex() *indirectIndex {
|
|||
func (d *indirectIndex) search(key []byte) int {
|
||||
// We use a binary search across our indirect offsets (pointers to all the keys
|
||||
// in the index slice).
|
||||
i := sort.Search(len(d.offsets), func(i int) bool {
|
||||
i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
|
||||
// i is the position in offsets we are at so get offset it points to
|
||||
offset := d.offsets[i]
|
||||
//offset := d.offsets[i]
|
||||
offset := int32(binary.BigEndian.Uint32(x))
|
||||
|
||||
// It's pointing to the start of the key which is a 2 byte length
|
||||
keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
|
||||
|
@ -644,7 +647,7 @@ func (d *indirectIndex) search(key []byte) int {
|
|||
|
||||
// See if we might have found the right index
|
||||
if i < len(d.offsets) {
|
||||
ofs := d.offsets[i]
|
||||
ofs := binary.BigEndian.Uint32(d.offsets[i : i+4])
|
||||
_, k, err := readKey(d.b[ofs:])
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error reading key: %v", err))
|
||||
|
@ -719,18 +722,19 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
|
|||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
if idx < 0 || idx >= len(d.offsets) {
|
||||
if idx < 0 || idx*4+4 > len(d.offsets) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
n, key, err := readKey(d.b[d.offsets[idx]:])
|
||||
ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])
|
||||
n, key, err := readKey(d.b[ofs:])
|
||||
if err != nil {
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
typ := d.b[int(d.offsets[idx])+n]
|
||||
typ := d.b[int(ofs)+n]
|
||||
|
||||
var entries indexEntries
|
||||
if _, err := readEntries(d.b[int(d.offsets[idx])+n:], &entries); err != nil {
|
||||
if _, err := readEntries(d.b[int(ofs)+n:], &entries); err != nil {
|
||||
return nil, 0, nil
|
||||
}
|
||||
return key, typ, entries.entries
|
||||
|
@ -740,12 +744,15 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
|
|||
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
|
||||
d.mu.RLock()
|
||||
|
||||
if idx < 0 || idx >= len(d.offsets) {
|
||||
if idx < 0 || idx*4+4 > len(d.offsets) {
|
||||
d.mu.RUnlock()
|
||||
return nil, 0
|
||||
}
|
||||
n, key, _ := readKey(d.b[d.offsets[idx]:])
|
||||
typ := d.b[d.offsets[idx]+int32(n)]
|
||||
ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]))
|
||||
|
||||
n, key, _ := readKey(d.b[ofs:])
|
||||
ofs = ofs + int32(n)
|
||||
typ := d.b[ofs]
|
||||
d.mu.RUnlock()
|
||||
return key, typ
|
||||
}
|
||||
|
@ -753,7 +760,7 @@ func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
|
|||
// KeyCount returns the count of unique keys in the index.
|
||||
func (d *indirectIndex) KeyCount() int {
|
||||
d.mu.RLock()
|
||||
n := len(d.offsets)
|
||||
n := len(d.offsets) / 4
|
||||
d.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
@ -773,8 +780,9 @@ func (d *indirectIndex) Delete(keys [][]byte) {
|
|||
|
||||
// Both keys and offsets are sorted. Walk both in order and skip
|
||||
// any keys that exist in both.
|
||||
offsets := make([]int32, 0, len(d.offsets))
|
||||
for _, offset := range d.offsets {
|
||||
var j int
|
||||
for i := 0; i+4 <= len(d.offsets); i += 4 {
|
||||
offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
|
||||
_, indexKey, _ := readKey(d.b[offset:])
|
||||
|
||||
for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
|
||||
|
@ -786,9 +794,11 @@ func (d *indirectIndex) Delete(keys [][]byte) {
|
|||
continue
|
||||
}
|
||||
|
||||
offsets = append(offsets, int32(offset))
|
||||
copy(d.offsets[j:j+4], d.offsets[i:i+4])
|
||||
j += 4
|
||||
//offsets = append(offsets, int32(offset))
|
||||
}
|
||||
d.offsets = offsets
|
||||
d.offsets = d.offsets[:j]
|
||||
}
|
||||
|
||||
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
|
||||
|
@ -945,9 +955,10 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
|
|||
// basically skips across the slice keeping track of the counter when we are at a key
|
||||
// field.
|
||||
var i int32
|
||||
var offsets []int32
|
||||
iMax := int32(len(b))
|
||||
for i < iMax {
|
||||
d.offsets = append(d.offsets, i)
|
||||
offsets = append(offsets, i)
|
||||
|
||||
// Skip to the start of the values
|
||||
// key length value (2) + type (1) + length of key
|
||||
|
@ -986,14 +997,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
|
|||
i += indexEntrySize
|
||||
}
|
||||
|
||||
firstOfs := d.offsets[0]
|
||||
firstOfs := offsets[0]
|
||||
_, key, err := readKey(b[firstOfs:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.minKey = key
|
||||
|
||||
lastOfs := d.offsets[len(d.offsets)-1]
|
||||
lastOfs := offsets[len(offsets)-1]
|
||||
_, key, err = readKey(b[lastOfs:])
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1003,6 +1014,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
|
|||
d.minTime = minTime
|
||||
d.maxTime = maxTime
|
||||
|
||||
d.offsets, err = mmap(nil, 0, len(offsets)*4)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range offsets {
|
||||
binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1014,6 +1033,10 @@ func (d *indirectIndex) Size() uint32 {
|
|||
return uint32(len(d.b))
|
||||
}
|
||||
|
||||
func (d *indirectIndex) Close() error {
|
||||
return munmap(d.offsets)
|
||||
}
|
||||
|
||||
// mmapAccess is mmap based block accessor. It access blocks through an
|
||||
// MMAP file interface.
|
||||
type mmapAccessor struct {
|
||||
|
|
Loading…
Reference in New Issue