From d3e832b462546eae2517d8e32d4c93c0847d6354 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 28 Aug 2017 16:43:26 -0600 Subject: [PATCH] Use offheap memory for indirect index offsets slice --- pkg/bytesutil/bytesutil.go | 23 +++++++++++ pkg/bytesutil/bytesutil_test.go | 35 +++++++++++++++++ tsdb/engine/tsm1/compact_test.go | 1 + tsdb/engine/tsm1/mmap_unix.go | 8 ++-- tsdb/engine/tsm1/reader.go | 65 +++++++++++++++++++++----------- 5 files changed, 107 insertions(+), 25 deletions(-) create mode 100644 pkg/bytesutil/bytesutil_test.go diff --git a/pkg/bytesutil/bytesutil.go b/pkg/bytesutil/bytesutil.go index 24d0598f7e..552391d0ce 100644 --- a/pkg/bytesutil/bytesutil.go +++ b/pkg/bytesutil/bytesutil.go @@ -2,6 +2,7 @@ package bytesutil import ( "bytes" + "fmt" "sort" ) @@ -18,6 +19,28 @@ func SearchBytes(a [][]byte, x []byte) int { return sort.Search(len(a), func(i int) bool { return bytes.Compare(a[i], x) >= 0 }) } +// SearchBytesFixed searches a for x using a binary search. The size of a must be a multiple of +// of x or else the function panics. There returned value is the index within a where x should +// exist. The caller should ensure that x does exist at this index. +func SearchBytesFixed(a []byte, sz int, fn func(x []byte) bool) int { + if len(a)%sz != 0 { + panic(fmt.Sprintf("x is not a multiple of a: %d %d", len(a), sz)) + } + + i, j := 0, len(a)-sz + for i < j { + h := int(uint(i+j) >> 1) + h -= h % sz + if !fn(a[h : h+sz]) { + i = h + sz + } else { + j = h + } + } + + return i +} + // Union returns the union of a & b in sorted order. func Union(a, b [][]byte) [][]byte { n := len(b) diff --git a/pkg/bytesutil/bytesutil_test.go b/pkg/bytesutil/bytesutil_test.go new file mode 100644 index 0000000000..b59e897b6b --- /dev/null +++ b/pkg/bytesutil/bytesutil_test.go @@ -0,0 +1,35 @@ +package bytesutil_test + +import ( + "bytes" + "encoding/binary" + "testing" + + "github.com/influxdata/influxdb/pkg/bytesutil" +) + +func TestSearchBytesFixed(t *testing.T) { + n, sz := 5, 8 + a := make([]byte, n*sz) // 5 - 8 byte int64s + + for i := 0; i < 5; i++ { + binary.BigEndian.PutUint64(a[i*sz:i*sz+sz], uint64(i)) + } + + var x [8]byte + + for i := 0; i < n; i++ { + binary.BigEndian.PutUint64(x[:], uint64(i)) + if exp, got := i*sz, bytesutil.SearchBytesFixed(a, len(x), func(v []byte) bool { + return bytes.Compare(v, x[:]) >= 0 + }); exp != got { + t.Fatalf("index mismatch: exp %v, got %v", exp, got) + } + } + + if exp, got := len(a)-1, bytesutil.SearchBytesFixed(a, 1, func(v []byte) bool { + return bytes.Compare(v, []byte{99}) >= 0 + }); exp != got { + t.Fatalf("index mismatch: exp %v, got %v", exp, got) + } +} diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index e9178ebeed..c3627d19f5 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -162,6 +162,7 @@ func TestCompactor_CompactFull(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } + println("Open", files[0]) r := MustOpenTSMReader(files[0]) if got, exp := r.KeyCount(), 3; got != exp { diff --git a/tsdb/engine/tsm1/mmap_unix.go b/tsdb/engine/tsm1/mmap_unix.go index 0a94e7259b..dcb5550d16 100644 --- a/tsdb/engine/tsm1/mmap_unix.go +++ b/tsdb/engine/tsm1/mmap_unix.go @@ -8,12 +8,12 @@ import ( ) func mmap(f *os.File, offset int64, length int) ([]byte, error) { - mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED) - if err != nil { - return nil, err + // anonymous mapping + if f == nil { + return syscall.Mmap(-1, 0, length, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) } - return mmap, nil + return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED) } func munmap(b []byte) (err error) { diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index a72ab1f127..34da4a1521 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -7,7 +7,6 @@ import ( "io" "math" "os" - "sort" "sync" "sync/atomic" @@ -102,6 +101,9 @@ type TSMIndex interface { // UnmarshalBinary populates an index from an encoded byte slice // representation of an index. UnmarshalBinary(b []byte) error + + // Close closes the index and releases any resources. + Close() error } // BlockIterator allows iterating over each block in a TSM file in order. It provides @@ -353,7 +355,7 @@ func (t *TSMReader) Close() error { return err } - return nil + return t.index.Close() } // Ref records a usage of this TSMReader. If there are active references @@ -598,7 +600,7 @@ type indirectIndex struct { // offsets contains the positions in b for each key. It points to the 2 byte length of // key. - offsets []int32 + offsets []byte // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the // file @@ -631,9 +633,10 @@ func NewIndirectIndex() *indirectIndex { func (d *indirectIndex) search(key []byte) int { // We use a binary search across our indirect offsets (pointers to all the keys // in the index slice). - i := sort.Search(len(d.offsets), func(i int) bool { + i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { // i is the position in offsets we are at so get offset it points to - offset := d.offsets[i] + //offset := d.offsets[i] + offset := int32(binary.BigEndian.Uint32(x)) // It's pointing to the start of the key which is a 2 byte length keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) @@ -644,7 +647,7 @@ func (d *indirectIndex) search(key []byte) int { // See if we might have found the right index if i < len(d.offsets) { - ofs := d.offsets[i] + ofs := binary.BigEndian.Uint32(d.offsets[i : i+4]) _, k, err := readKey(d.b[ofs:]) if err != nil { panic(fmt.Sprintf("error reading key: %v", err)) @@ -719,18 +722,19 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) { d.mu.RLock() defer d.mu.RUnlock() - if idx < 0 || idx >= len(d.offsets) { + if idx < 0 || idx*4+4 > len(d.offsets) { return nil, 0, nil } - n, key, err := readKey(d.b[d.offsets[idx]:]) + ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]) + n, key, err := readKey(d.b[ofs:]) if err != nil { return nil, 0, nil } - typ := d.b[int(d.offsets[idx])+n] + typ := d.b[int(ofs)+n] var entries indexEntries - if _, err := readEntries(d.b[int(d.offsets[idx])+n:], &entries); err != nil { + if _, err := readEntries(d.b[int(ofs)+n:], &entries); err != nil { return nil, 0, nil } return key, typ, entries.entries @@ -740,12 +744,15 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) { func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { d.mu.RLock() - if idx < 0 || idx >= len(d.offsets) { + if idx < 0 || idx*4+4 > len(d.offsets) { d.mu.RUnlock() return nil, 0 } - n, key, _ := readKey(d.b[d.offsets[idx]:]) - typ := d.b[d.offsets[idx]+int32(n)] + ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])) + + n, key, _ := readKey(d.b[ofs:]) + ofs = ofs + int32(n) + typ := d.b[ofs] d.mu.RUnlock() return key, typ } @@ -753,7 +760,7 @@ func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { // KeyCount returns the count of unique keys in the index. func (d *indirectIndex) KeyCount() int { d.mu.RLock() - n := len(d.offsets) + n := len(d.offsets) / 4 d.mu.RUnlock() return n } @@ -773,8 +780,9 @@ func (d *indirectIndex) Delete(keys [][]byte) { // Both keys and offsets are sorted. Walk both in order and skip // any keys that exist in both. - offsets := make([]int32, 0, len(d.offsets)) - for _, offset := range d.offsets { + var j int + for i := 0; i+4 <= len(d.offsets); i += 4 { + offset := binary.BigEndian.Uint32(d.offsets[i : i+4]) _, indexKey, _ := readKey(d.b[offset:]) for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 { @@ -786,9 +794,11 @@ func (d *indirectIndex) Delete(keys [][]byte) { continue } - offsets = append(offsets, int32(offset)) + copy(d.offsets[j:j+4], d.offsets[i:i+4]) + j += 4 + //offsets = append(offsets, int32(offset)) } - d.offsets = offsets + d.offsets = d.offsets[:j] } // DeleteRange removes the given keys with data between minTime and maxTime from the index. @@ -945,9 +955,10 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { // basically skips across the slice keeping track of the counter when we are at a key // field. var i int32 + var offsets []int32 iMax := int32(len(b)) for i < iMax { - d.offsets = append(d.offsets, i) + offsets = append(offsets, i) // Skip to the start of the values // key length value (2) + type (1) + length of key @@ -986,14 +997,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { i += indexEntrySize } - firstOfs := d.offsets[0] + firstOfs := offsets[0] _, key, err := readKey(b[firstOfs:]) if err != nil { return err } d.minKey = key - lastOfs := d.offsets[len(d.offsets)-1] + lastOfs := offsets[len(offsets)-1] _, key, err = readKey(b[lastOfs:]) if err != nil { return err @@ -1003,6 +1014,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { d.minTime = minTime d.maxTime = maxTime + d.offsets, err = mmap(nil, 0, len(offsets)*4) + if err != nil { + return err + } + for i, v := range offsets { + binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v)) + } + return nil } @@ -1014,6 +1033,10 @@ func (d *indirectIndex) Size() uint32 { return uint32(len(d.b)) } +func (d *indirectIndex) Close() error { + return munmap(d.offsets) +} + // mmapAccess is mmap based block accessor. It access blocks through an // MMAP file interface. type mmapAccessor struct {