Merge pull request #8796 from influxdata/jw-tsm-index

High Cardinality Fixes
pull/8836/head
Jason Wilder 2017-09-12 10:48:34 -06:00 committed by GitHub
commit f3f30726d8
23 changed files with 672 additions and 428 deletions

View File

@ -11,14 +11,16 @@ import (
"fmt"
"math"
"github.com/influxdata/influxdb/pkg/pool"
"github.com/spaolacci/murmur3"
)
// Filter represents a bloom filter.
type Filter struct {
k uint64
b []byte
mask uint64
k uint64
b []byte
mask uint64
hashPool *pool.Generic
}
// NewFilter returns a new instance of Filter using m bits and k hash functions.
@ -30,6 +32,9 @@ func NewFilter(m uint64, k uint64) *Filter {
k: k,
b: make([]byte, m/8),
mask: m - 1,
hashPool: pool.NewGeneric(16, func(sz int) interface{} {
return murmur3.New128()
}),
}
}
@ -45,6 +50,9 @@ func NewFilterBuffer(buf []byte, k uint64) (*Filter, error) {
k: k,
b: buf,
mask: m - 1,
hashPool: pool.NewGeneric(16, func(sz int) interface{} {
return murmur3.New128()
}),
}, nil
}
@ -57,9 +65,16 @@ func (f *Filter) K() uint64 { return f.k }
// Bytes returns the underlying backing slice.
func (f *Filter) Bytes() []byte { return f.b }
// Clone returns a copy of f.
func (f *Filter) Clone() *Filter {
other := &Filter{k: f.k, b: make([]byte, len(f.b)), mask: f.mask, hashPool: f.hashPool}
copy(other.b, f.b)
return other
}
// Insert inserts data to the filter.
func (f *Filter) Insert(v []byte) {
h := hash(v)
h := f.hash(v)
for i := uint64(0); i < f.k; i++ {
loc := f.location(h, i)
f.b[loc/8] |= 1 << (loc % 8)
@ -69,7 +84,7 @@ func (f *Filter) Insert(v []byte) {
// Contains returns true if the filter possibly contains v.
// Returns false if the filter definitely does not contain v.
func (f *Filter) Contains(v []byte) bool {
h := hash(v)
h := f.hash(v)
for i := uint64(0); i < f.k; i++ {
loc := f.location(h, i)
if f.b[loc/8]&(1<<(loc%8)) == 0 {
@ -82,6 +97,10 @@ func (f *Filter) Contains(v []byte) bool {
// Merge performs an in-place union of other into f.
// Returns an error if m or k of the filters differs.
func (f *Filter) Merge(other *Filter) error {
if other == nil {
return nil
}
// Ensure m & k fields match.
if len(f.b) != len(other.b) {
return fmt.Errorf("bloom.Filter.Merge(): m mismatch: %d <> %d", len(f.b), len(other.b))
@ -102,6 +121,18 @@ func (f *Filter) location(h [4]uint64, i uint64) uint {
return uint((h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) & f.mask)
}
// hash returns a set of 4 based hashes.
func (f *Filter) hash(data []byte) [4]uint64 {
h := f.hashPool.Get(0).(murmur3.Hash128)
defer f.hashPool.Put(h)
h.Reset()
h.Write(data)
v1, v2 := h.Sum128()
h.Write([]byte{1})
v3, v4 := h.Sum128()
return [4]uint64{v1, v2, v3, v4}
}
// Estimate returns an estimated bit count and hash count given the element count and false positive rate.
func Estimate(n uint64, p float64) (m uint64, k uint64) {
m = uint64(math.Ceil(-1 * float64(n) * math.Log(p) / math.Pow(math.Log(2), 2)))
@ -119,13 +150,3 @@ func pow2(v uint64) uint64 {
}
panic("unreachable")
}
// hash returns a set of 4 based hashes.
func hash(data []byte) [4]uint64 {
h := murmur3.New128()
h.Write(data)
v1, v2 := h.Sum128()
h.Write([]byte{1})
v3, v4 := h.Sum128()
return [4]uint64{v1, v2, v3, v4}
}

View File

@ -2,6 +2,7 @@ package bytesutil
import (
"bytes"
"fmt"
"sort"
)
@ -18,6 +19,28 @@ func SearchBytes(a [][]byte, x []byte) int {
return sort.Search(len(a), func(i int) bool { return bytes.Compare(a[i], x) >= 0 })
}
// SearchBytesFixed searches a for x using a binary search. The size of a must be a multiple of
// of x or else the function panics. There returned value is the index within a where x should
// exist. The caller should ensure that x does exist at this index.
func SearchBytesFixed(a []byte, sz int, fn func(x []byte) bool) int {
if len(a)%sz != 0 {
panic(fmt.Sprintf("x is not a multiple of a: %d %d", len(a), sz))
}
i, j := 0, len(a)-sz
for i < j {
h := int(uint(i+j) >> 1)
h -= h % sz
if !fn(a[h : h+sz]) {
i = h + sz
} else {
j = h
}
}
return i
}
// Union returns the union of a & b in sorted order.
func Union(a, b [][]byte) [][]byte {
n := len(b)

View File

@ -0,0 +1,35 @@
package bytesutil_test
import (
"bytes"
"encoding/binary"
"testing"
"github.com/influxdata/influxdb/pkg/bytesutil"
)
func TestSearchBytesFixed(t *testing.T) {
n, sz := 5, 8
a := make([]byte, n*sz) // 5 - 8 byte int64s
for i := 0; i < 5; i++ {
binary.BigEndian.PutUint64(a[i*sz:i*sz+sz], uint64(i))
}
var x [8]byte
for i := 0; i < n; i++ {
binary.BigEndian.PutUint64(x[:], uint64(i))
if exp, got := i*sz, bytesutil.SearchBytesFixed(a, len(x), func(v []byte) bool {
return bytes.Compare(v, x[:]) >= 0
}); exp != got {
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
}
}
if exp, got := len(a)-1, bytesutil.SearchBytesFixed(a, 1, func(v []byte) bool {
return bytes.Compare(v, []byte{99}) >= 0
}); exp != got {
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
}
}

View File

@ -169,7 +169,7 @@ const (
// storer is the interface that descibes a cache's store.
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
write(key []byte, values Values) error // Write an entry to the store.
write(key []byte, values Values) (bool, error) // Write an entry to the store.
add(key []byte, entry *entry) // Add a new entry to the store.
remove(key []byte) // Remove an entry from the store.
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
@ -296,11 +296,15 @@ func (c *Cache) Write(key []byte, values []Value) error {
return ErrCacheMemorySizeLimitExceeded(n, limit)
}
if err := c.store.write(key, values); err != nil {
newKey, err := c.store.write(key, values)
if err != nil {
atomic.AddInt64(&c.stats.WriteErr, 1)
return err
}
if newKey {
addedSize += uint64(len(key))
}
// Update the cache size and the memory size stat.
c.increaseSize(addedSize)
c.updateMemSize(int64(addedSize))
@ -337,12 +341,16 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// We'll optimistially set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
for k, v := range values {
if err := store.write([]byte(k), v); err != nil {
newKey, err := store.write([]byte(k), v)
if err != nil {
// The write failed, hold onto the error and adjust the size delta.
werr = err
addedSize -= uint64(Values(v).Size())
c.decreaseSize(uint64(Values(v).Size()))
}
if newKey {
c.increaseSize(uint64(len(k)))
}
}
// Some points in the batch were dropped. An error is returned so
@ -567,7 +575,7 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {
origSize := uint64(e.size())
if min == math.MinInt64 && max == math.MaxInt64 {
c.decreaseSize(origSize)
c.decreaseSize(origSize + uint64(len(k)))
c.store.remove(k)
continue
}
@ -575,7 +583,7 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {
e.filter(min, max)
if e.count() == 0 {
c.store.remove(k)
c.decreaseSize(origSize)
c.decreaseSize(origSize + uint64(len(k)))
continue
}
@ -750,7 +758,7 @@ func (c *Cache) updateSnapshots() {
type emptyStore struct{}
func (e emptyStore) entry(key []byte) *entry { return nil }
func (e emptyStore) write(key []byte, values Values) error { return nil }
func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil }
func (e emptyStore) add(key []byte, entry *entry) {}
func (e emptyStore) remove(key []byte) {}
func (e emptyStore) keys(sorted bool) [][]byte { return nil }

View File

@ -50,7 +50,7 @@ func TestCache_CacheWrite(t *testing.T) {
if err := c.Write([]byte("bar"), values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != 2*valuesSize {
if n := c.Size(); n != 2*valuesSize+6 {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}
@ -75,7 +75,7 @@ func TestCache_CacheWrite_TypeConflict(t *testing.T) {
t.Fatalf("expected field type conflict")
}
if exp, got := uint64(v0.Size()), c.Size(); exp != got {
if exp, got := uint64(v0.Size())+3, c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
}
@ -92,7 +92,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != 2*valuesSize {
if n := c.Size(); n != 2*valuesSize+6 {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}
@ -120,11 +120,11 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
c.init()
c.store = ms
ms.writef = func(key []byte, v Values) error {
ms.writef = func(key []byte, v Values) (bool, error) {
if bytes.Equal(key, []byte("foo")) {
return errors.New("write failed")
return false, errors.New("write failed")
}
return nil
return true, nil
}
values = map[string][]Value{"foo": []Value{v, v}, "bar": []Value{v}}
@ -133,7 +133,7 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
}
// Cache size decreased correctly.
if got, exp := c.Size(), uint64(16); got != exp {
if got, exp := c.Size(), uint64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
@ -158,7 +158,7 @@ func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
t.Fatalf(" expected field type conflict")
}
if exp, got := uint64(v0.Size()), c.Size(); exp != got {
if exp, got := uint64(v0.Size())+3, c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
@ -179,7 +179,7 @@ func TestCache_Cache_DeleteRange(t *testing.T) {
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != 2*valuesSize {
if n := c.Size(); n != 2*valuesSize+6 {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}
@ -193,7 +193,7 @@ func TestCache_Cache_DeleteRange(t *testing.T) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}
if got, exp := c.Size(), valuesSize+uint64(v0.Size()); exp != got {
if got, exp := c.Size(), valuesSize+uint64(v0.Size())+6; exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
@ -218,7 +218,7 @@ func TestCache_DeleteRange_NoValues(t *testing.T) {
if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != valuesSize {
if n := c.Size(); n != valuesSize+3 {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}
@ -253,7 +253,7 @@ func TestCache_Cache_Delete(t *testing.T) {
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if n := c.Size(); n != 2*valuesSize {
if n := c.Size(); n != 2*valuesSize+6 {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n)
}
@ -267,7 +267,7 @@ func TestCache_Cache_Delete(t *testing.T) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}
if got, exp := c.Size(), valuesSize; exp != got {
if got, exp := c.Size(), valuesSize+3; exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
@ -450,12 +450,12 @@ func TestCache_Snapshot_Stats(t *testing.T) {
}
// Store size should have been reset.
if got, exp := c.Size(), uint64(16); got != exp {
if got, exp := c.Size(), uint64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
// Cached bytes should have been increased.
if got, exp := c.stats.CachedBytes, int64(16); got != exp {
if got, exp := c.stats.CachedBytes, int64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}
@ -790,7 +790,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
// Cache's storer implememation.
type TestStore struct {
entryf func(key []byte) *entry
writef func(key []byte, values Values) error
writef func(key []byte, values Values) (bool, error)
addf func(key []byte, entry *entry)
removef func(key []byte)
keysf func(sorted bool) [][]byte
@ -801,7 +801,7 @@ type TestStore struct {
func NewTestStore() *TestStore { return &TestStore{} }
func (s *TestStore) entry(key []byte) *entry { return s.entryf(key) }
func (s *TestStore) write(key []byte, values Values) error { return s.writef(key, values) }
func (s *TestStore) write(key []byte, values Values) (bool, error) { return s.writef(key, values) }
func (s *TestStore) add(key []byte, entry *entry) { s.addf(key, entry) }
func (s *TestStore) remove(key []byte) { s.removef(key) }
func (s *TestStore) keys(sorted bool) [][]byte { return s.keysf(sorted) }

View File

@ -41,7 +41,6 @@ var (
errMaxFileExceeded = fmt.Errorf("max file exceeded")
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
errCompactionsDisabled = fmt.Errorf("compactions disabled")
errCompactionAborted = fmt.Errorf("compaction aborted")
)
type errCompactionInProgress struct {
@ -56,6 +55,17 @@ func (e errCompactionInProgress) Error() string {
return "compaction in progress"
}
type errCompactionAborted struct {
err error
}
func (e errCompactionAborted) Error() string {
if e.err != nil {
return fmt.Sprintf("compaction aborted: %s", e.err)
}
return "compaction aborted"
}
// CompactionGroup represents a list of files eligible to be compacted together.
type CompactionGroup []string
@ -586,6 +596,7 @@ type Compactor struct {
FileStore interface {
NextGeneration() int
TSMReader(path string) *TSMReader
}
mu sync.RWMutex
@ -737,20 +748,17 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
for _, file := range tsmFiles {
select {
case <-intC:
return nil, errCompactionAborted
return nil, errCompactionAborted{}
default:
}
f, err := os.Open(file)
if err != nil {
return nil, err
tr := c.FileStore.TSMReader(file)
if tr == nil {
// This would be a bug if this occurred as tsmFiles passed in should only be
// assigned to one compaction at any one time. A nil tr would mean the file
// doesn't exist.
return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
}
tr, err := NewTSMReader(f)
if err != nil {
return nil, err
}
defer tr.Close()
trs = append(trs, tr)
}
@ -917,7 +925,7 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
c.mu.RUnlock()
if !enabled {
return errCompactionAborted
return errCompactionAborted{}
}
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
@ -1260,7 +1268,7 @@ func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if compactions were disabled while we were running.
select {
case <-k.interrupt:
return nil, 0, 0, nil, errCompactionAborted
return nil, 0, 0, nil, errCompactionAborted{}
default:
}
@ -1400,7 +1408,7 @@ func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if snapshot compactions were disabled while we were running.
select {
case <-c.interrupt:
return nil, 0, 0, nil, errCompactionAborted
return nil, 0, 0, nil, errCompactionAborted{}
default:
}

View File

@ -5,6 +5,7 @@ import (
"math"
"os"
"path/filepath"
"sort"
"testing"
"time"
@ -116,9 +117,11 @@ func TestCompactor_CompactFull(t *testing.T) {
}
f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
}
files, err := compactor.CompactFull([]string{f1, f2, f3})
@ -215,9 +218,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
}
f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
Size: 2,
}
@ -294,9 +299,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
}
f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
Size: 2,
}
@ -365,9 +372,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
}
f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
Size: 2,
}
compactor.Open()
@ -464,9 +473,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
}
f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
Size: 2,
}
compactor.Open()
@ -564,9 +575,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
}
f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
Size: 2,
}
compactor.Open()
@ -669,9 +682,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
}
f3 := MustWriteTSM(dir, 3, writes)
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
Size: 2,
}
compactor.Open()
@ -782,9 +797,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
}
f2.Close()
fs := &fakeFileStore{}
defer fs.Close()
compactor := &tsm1.Compactor{
Dir: dir,
FileStore: &fakeFileStore{},
FileStore: fs,
}
compactor.Open()
@ -2396,8 +2413,14 @@ func MustTSMWriter(dir string, gen int) (tsm1.TSMWriter, string) {
func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string {
w, name := MustTSMWriter(dir, gen)
for k, v := range values {
if err := w.Write([]byte(k), v); err != nil {
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), values[k]); err != nil {
panic(fmt.Sprintf("write TSM value: %v", err))
}
}
@ -2434,6 +2457,7 @@ type fakeFileStore struct {
PathsFn func() []tsm1.FileStat
lastModified time.Time
blockCount int
readers []*tsm1.TSMReader
}
func (w *fakeFileStore) Stats() []tsm1.FileStat {
@ -2451,3 +2475,16 @@ func (w *fakeFileStore) LastModified() time.Time {
func (w *fakeFileStore) BlockCount(path string, idx int) int {
return w.blockCount
}
func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
r := MustOpenTSMReader(path)
w.readers = append(w.readers, r)
return r
}
func (w *fakeFileStore) Close() {
for _, r := range w.readers {
r.Close()
}
w.readers = nil
}

View File

@ -244,7 +244,7 @@ func (e *Engine) enableLevelCompactions(wait bool) {
go func() { defer e.wg.Done(); e.compactTSMFull(quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 1, quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 2, quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(false, 3, quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 3, quit) }()
}
// disableLevelCompactions will stop level compactions before returning.

View File

@ -479,6 +479,19 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
return f.cost(key, min, max)
}
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
// Otherwise it returns nil.
func (f *FileStore) TSMReader(path string) *TSMReader {
f.mu.RLock()
defer f.mu.RUnlock()
for _, r := range f.files {
if r.Path() == path {
return r.(*TSMReader)
}
}
return nil
}
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor {
f.mu.RLock()
@ -498,6 +511,11 @@ func (f *FileStore) Stats() []FileStat {
// The file stats cache is invalid due to changes to files. Need to
// recalculate.
f.mu.Lock()
defer f.mu.Unlock()
if len(f.lastFileStats) > 0 {
return f.lastFileStats
}
// If lastFileStats's capacity is far away from the number of entries
// we need to add, then we'll reallocate.
@ -508,7 +526,6 @@ func (f *FileStore) Stats() []FileStat {
for _, fd := range f.files {
f.lastFileStats = append(f.lastFileStats, fd.Stats())
}
defer f.mu.Unlock()
return f.lastFileStats
}
@ -641,8 +658,8 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
// If times didn't change (which can happen since file mod times are second level),
// then add a ns to the time to ensure that lastModified changes since files on disk
// actually did change
if maxTime.Equal(f.lastModified) {
maxTime = maxTime.UTC().Add(1)
if maxTime.Equal(f.lastModified) || maxTime.Before(f.lastModified) {
maxTime = f.lastModified.UTC().Add(1)
}
f.lastModified = maxTime.UTC()

View File

@ -10,6 +10,11 @@ import (
)
func mmap(f *os.File, offset int64, length int) ([]byte, error) {
// anonymous mapping
if f == nil {
return unix.Mmap(-1, 0, length, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
}
mmap, err := unix.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err

View File

@ -8,12 +8,12 @@ import (
)
func mmap(f *os.File, offset int64, length int) ([]byte, error) {
mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
// anonymous mapping
if f == nil {
return syscall.Mmap(-1, 0, length, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
}
return mmap, nil
return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
}
func munmap(b []byte) (err error) {

View File

@ -44,6 +44,11 @@ func openSharedFile(f *os.File) (file *os.File, err error) {
}
func mmap(f *os.File, offset int64, length int) (out []byte, err error) {
// TODO: Add support for anonymous mapping on windows
if f == nil {
return make([]byte, length), nil
}
// Open a file mapping handle.
sizelo := uint32(length >> 32)
sizehi := uint32(length) & 0xffffffff

View File

@ -7,7 +7,7 @@ import (
"io"
"math"
"os"
"sort"
"runtime"
"sync"
"sync/atomic"
@ -102,6 +102,9 @@ type TSMIndex interface {
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
UnmarshalBinary(b []byte) error
// Close closes the index and releases any resources.
Close() error
}
// BlockIterator allows iterating over each block in a TSM file in order. It provides
@ -353,7 +356,7 @@ func (t *TSMReader) Close() error {
return err
}
return nil
return t.index.Close()
}
// Ref records a usage of this TSMReader. If there are active references
@ -598,7 +601,7 @@ type indirectIndex struct {
// offsets contains the positions in b for each key. It points to the 2 byte length of
// key.
offsets []int32
offsets []byte
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
// file
@ -631,9 +634,10 @@ func NewIndirectIndex() *indirectIndex {
func (d *indirectIndex) search(key []byte) int {
// We use a binary search across our indirect offsets (pointers to all the keys
// in the index slice).
i := sort.Search(len(d.offsets), func(i int) bool {
i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
// i is the position in offsets we are at so get offset it points to
offset := d.offsets[i]
//offset := d.offsets[i]
offset := int32(binary.BigEndian.Uint32(x))
// It's pointing to the start of the key which is a 2 byte length
keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
@ -644,7 +648,7 @@ func (d *indirectIndex) search(key []byte) int {
// See if we might have found the right index
if i < len(d.offsets) {
ofs := d.offsets[i]
ofs := binary.BigEndian.Uint32(d.offsets[i : i+4])
_, k, err := readKey(d.b[ofs:])
if err != nil {
panic(fmt.Sprintf("error reading key: %v", err))
@ -719,18 +723,19 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
d.mu.RLock()
defer d.mu.RUnlock()
if idx < 0 || idx >= len(d.offsets) {
if idx < 0 || idx*4+4 > len(d.offsets) {
return nil, 0, nil
}
n, key, err := readKey(d.b[d.offsets[idx]:])
ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])
n, key, err := readKey(d.b[ofs:])
if err != nil {
return nil, 0, nil
}
typ := d.b[int(d.offsets[idx])+n]
typ := d.b[int(ofs)+n]
var entries indexEntries
if _, err := readEntries(d.b[int(d.offsets[idx])+n:], &entries); err != nil {
if _, err := readEntries(d.b[int(ofs)+n:], &entries); err != nil {
return nil, 0, nil
}
return key, typ, entries.entries
@ -740,12 +745,15 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
d.mu.RLock()
if idx < 0 || idx >= len(d.offsets) {
if idx < 0 || idx*4+4 > len(d.offsets) {
d.mu.RUnlock()
return nil, 0
}
n, key, _ := readKey(d.b[d.offsets[idx]:])
typ := d.b[d.offsets[idx]+int32(n)]
ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]))
n, key, _ := readKey(d.b[ofs:])
ofs = ofs + int32(n)
typ := d.b[ofs]
d.mu.RUnlock()
return key, typ
}
@ -753,7 +761,7 @@ func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
// KeyCount returns the count of unique keys in the index.
func (d *indirectIndex) KeyCount() int {
d.mu.RLock()
n := len(d.offsets)
n := len(d.offsets) / 4
d.mu.RUnlock()
return n
}
@ -773,8 +781,9 @@ func (d *indirectIndex) Delete(keys [][]byte) {
// Both keys and offsets are sorted. Walk both in order and skip
// any keys that exist in both.
offsets := make([]int32, 0, len(d.offsets))
for _, offset := range d.offsets {
var j int
for i := 0; i+4 <= len(d.offsets); i += 4 {
offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
_, indexKey, _ := readKey(d.b[offset:])
for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
@ -786,9 +795,10 @@ func (d *indirectIndex) Delete(keys [][]byte) {
continue
}
offsets = append(offsets, int32(offset))
copy(d.offsets[j:j+4], d.offsets[i:i+4])
j += 4
}
d.offsets = offsets
d.offsets = d.offsets[:j]
}
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
@ -945,9 +955,10 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
// basically skips across the slice keeping track of the counter when we are at a key
// field.
var i int32
var offsets []int32
iMax := int32(len(b))
for i < iMax {
d.offsets = append(d.offsets, i)
offsets = append(offsets, i)
// Skip to the start of the values
// key length value (2) + type (1) + length of key
@ -986,14 +997,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
i += indexEntrySize
}
firstOfs := d.offsets[0]
firstOfs := offsets[0]
_, key, err := readKey(b[firstOfs:])
if err != nil {
return err
}
d.minKey = key
lastOfs := d.offsets[len(d.offsets)-1]
lastOfs := offsets[len(offsets)-1]
_, key, err = readKey(b[lastOfs:])
if err != nil {
return err
@ -1003,6 +1014,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
d.minTime = minTime
d.maxTime = maxTime
d.offsets, err = mmap(nil, 0, len(offsets)*4)
if err != nil {
return err
}
for i, v := range offsets {
binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v))
}
return nil
}
@ -1014,6 +1033,14 @@ func (d *indirectIndex) Size() uint32 {
return uint32(len(d.b))
}
func (d *indirectIndex) Close() error {
// Windows doesn't use the anonymous map for the offsets index
if runtime.GOOS == "windows" {
return nil
}
return munmap(d.offsets[:cap(d.offsets)])
}
// mmapAccess is mmap based block accessor. It access blocks through an
// MMAP file interface.
type mmapAccessor struct {

View File

@ -6,6 +6,7 @@ import (
"math"
"os"
"path/filepath"
"sort"
"testing"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
@ -64,29 +65,22 @@ func TestTSMReader_MMAP_ReadAll(t *testing.T) {
t.Fatalf("unexpected error creating writer: %v", err)
}
var data = []struct {
key string
values []tsm1.Value
}{
{"float", []tsm1.Value{
tsm1.NewValue(1, 1.0)},
},
{"int", []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
tsm1.NewValue(1, true)},
},
{"string", []tsm1.Value{
tsm1.NewValue(1, "foo")},
},
var data = map[string][]tsm1.Value{
"float": []tsm1.Value{tsm1.NewValue(1, 1.0)},
"int": []tsm1.Value{tsm1.NewValue(1, int64(1))},
"uint": []tsm1.Value{tsm1.NewValue(1, ^uint64(0))},
"bool": []tsm1.Value{tsm1.NewValue(1, true)},
"string": []tsm1.Value{tsm1.NewValue(1, "foo")},
}
for _, d := range data {
if err := w.Write([]byte(d.key), d.values); err != nil {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
@ -111,17 +105,17 @@ func TestTSMReader_MMAP_ReadAll(t *testing.T) {
defer r.Close()
var count int
for _, d := range data {
readValues, err := r.ReadAll([]byte(d.key))
for k, vals := range data {
readValues, err := r.ReadAll([]byte(k))
if err != nil {
t.Fatalf("unexpected error readin: %v", err)
}
if exp := len(d.values); exp != len(readValues) {
if exp := len(vals); exp != len(readValues) {
t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), exp)
}
for i, v := range d.values {
for i, v := range vals {
if v.Value() != readValues[i].Value() {
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value())
}
@ -145,28 +139,27 @@ func TestTSMReader_MMAP_Read(t *testing.T) {
t.Fatalf("unexpected error creating writer: %v", err)
}
var data = []struct {
key string
values []tsm1.Value
}{
{"float", []tsm1.Value{
var data = map[string][]tsm1.Value{
"float": []tsm1.Value{
tsm1.NewValue(1, 1.0)},
},
{"int", []tsm1.Value{
"int": []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
"uint": []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
"bool": []tsm1.Value{
tsm1.NewValue(1, true)},
},
{"string", []tsm1.Value{
"string": []tsm1.Value{
tsm1.NewValue(1, "foo")},
},
}
for _, d := range data {
if err := w.Write([]byte(d.key), d.values); err != nil {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
@ -191,17 +184,17 @@ func TestTSMReader_MMAP_Read(t *testing.T) {
defer r.Close()
var count int
for _, d := range data {
readValues, err := r.Read([]byte(d.key), d.values[0].UnixNano())
for k, vals := range data {
readValues, err := r.Read([]byte(k), vals[0].UnixNano())
if err != nil {
t.Fatalf("unexpected error readin: %v", err)
}
if exp := len(d.values); exp != len(readValues) {
if exp := len(vals); exp != len(readValues) {
t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), exp)
}
for i, v := range d.values {
for i, v := range vals {
if v.Value() != readValues[i].Value() {
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value())
}
@ -225,29 +218,27 @@ func TestTSMReader_MMAP_Keys(t *testing.T) {
t.Fatalf("unexpected error creating writer: %v", err)
}
var data = []struct {
key string
values []tsm1.Value
}{
{"float", []tsm1.Value{
var data = map[string][]tsm1.Value{
"float": []tsm1.Value{
tsm1.NewValue(1, 1.0)},
},
{"int", []tsm1.Value{
"int": []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
"uint": []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
"bool": []tsm1.Value{
tsm1.NewValue(1, true)},
},
{"string", []tsm1.Value{
"string": []tsm1.Value{
tsm1.NewValue(1, "foo")},
},
}
for _, d := range data {
if err := w.Write([]byte(d.key), d.values); err != nil {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
@ -272,17 +263,17 @@ func TestTSMReader_MMAP_Keys(t *testing.T) {
defer r.Close()
var count int
for _, d := range data {
readValues, err := r.Read([]byte(d.key), d.values[0].UnixNano())
for k, vals := range data {
readValues, err := r.Read([]byte(k), vals[0].UnixNano())
if err != nil {
t.Fatalf("unexpected error readin: %v", err)
}
if exp := len(d.values); exp != len(readValues) {
if exp := len(vals); exp != len(readValues) {
t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), exp)
}
for i, v := range d.values {
for i, v := range vals {
if v.Value() != readValues[i].Value() {
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value())
}
@ -889,6 +880,7 @@ func TestIndirectIndex_Entries(t *testing.T) {
index.Add([]byte("cpu"), tsm1.BlockFloat64, 0, 1, 10, 100)
index.Add([]byte("cpu"), tsm1.BlockFloat64, 2, 3, 20, 200)
index.Add([]byte("mem"), tsm1.BlockFloat64, 0, 1, 10, 100)
exp := index.Entries([]byte("cpu"))
b, err := index.MarshalBinary()
if err != nil {
@ -900,7 +892,6 @@ func TestIndirectIndex_Entries(t *testing.T) {
t.Fatalf("unexpected error unmarshaling index: %v", err)
}
exp := index.Entries([]byte("cpu"))
entries := indirect.Entries([]byte("cpu"))
if got, exp := len(entries), len(exp); got != exp {
@ -991,8 +982,8 @@ func TestIndirectIndex_Type(t *testing.T) {
func TestIndirectIndex_Keys(t *testing.T) {
index := tsm1.NewIndexWriter()
index.Add([]byte("cpu"), tsm1.BlockFloat64, 0, 1, 10, 20)
index.Add([]byte("mem"), tsm1.BlockFloat64, 0, 1, 10, 20)
index.Add([]byte("cpu"), tsm1.BlockFloat64, 1, 2, 20, 30)
index.Add([]byte("mem"), tsm1.BlockFloat64, 0, 1, 10, 20)
keys := index.Keys()
@ -1176,8 +1167,14 @@ func TestBlockIterator_Sorted(t *testing.T) {
"load": []tsm1.Value{tsm1.NewValue(1, "string")},
}
for k, v := range values {
if err := w.Write([]byte(k), v); err != nil {
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), values[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
@ -1323,29 +1320,27 @@ func TestTSMReader_File_ReadAll(t *testing.T) {
t.Fatalf("unexpected error creating writer: %v", err)
}
var data = []struct {
key string
values []tsm1.Value
}{
{"float", []tsm1.Value{
var data = map[string][]tsm1.Value{
"float": []tsm1.Value{
tsm1.NewValue(1, 1.0)},
},
{"int", []tsm1.Value{
"int": []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
"uint": []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
"bool": []tsm1.Value{
tsm1.NewValue(1, true)},
},
{"string", []tsm1.Value{
"string": []tsm1.Value{
tsm1.NewValue(1, "foo")},
},
}
for _, d := range data {
if err := w.Write([]byte(d.key), d.values); err != nil {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
@ -1370,17 +1365,17 @@ func TestTSMReader_File_ReadAll(t *testing.T) {
defer r.Close()
var count int
for _, d := range data {
readValues, err := r.ReadAll([]byte(d.key))
for k, vals := range data {
readValues, err := r.ReadAll([]byte(k))
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if exp := len(d.values); exp != len(readValues) {
if exp := len(vals); exp != len(readValues) {
t.Fatalf("read values length mismatch: exp %v, got %v", exp, len(readValues))
}
for i, v := range d.values {
for i, v := range vals {
if exp, got := v.Value(), readValues[i].Value(); exp != got {
t.Fatalf("read value mismatch(%d): exp %v, got %d", i, v.Value(), readValues[i].Value())
}
@ -1473,28 +1468,27 @@ func TestTSMReader_File_Read(t *testing.T) {
t.Fatalf("unexpected error creating writer: %v", err)
}
var data = []struct {
key string
values []tsm1.Value
}{
{"float", []tsm1.Value{
var data = map[string][]tsm1.Value{
"float": []tsm1.Value{
tsm1.NewValue(1, 1.0)},
},
{"int", []tsm1.Value{
"int": []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
"uint": []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
"bool": []tsm1.Value{
tsm1.NewValue(1, true)},
},
{"string", []tsm1.Value{
"string": []tsm1.Value{
tsm1.NewValue(1, "foo")},
},
}
for _, d := range data {
if err := w.Write([]byte(d.key), d.values); err != nil {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
@ -1519,17 +1513,17 @@ func TestTSMReader_File_Read(t *testing.T) {
defer r.Close()
var count int
for _, d := range data {
readValues, err := r.Read([]byte(d.key), d.values[0].UnixNano())
for k, vals := range data {
readValues, err := r.Read([]byte(k), vals[0].UnixNano())
if err != nil {
t.Fatalf("unexpected error readin: %v", err)
}
if exp, got := len(d.values), len(readValues); exp != got {
if exp, got := len(vals), len(readValues); exp != got {
t.Fatalf("read values length mismatch: exp %v, got %v", exp, len(readValues))
}
for i, v := range d.values {
for i, v := range vals {
if v.Value() != readValues[i].Value() {
t.Fatalf("read value mismatch(%d): exp %v, got %d", i, v.Value(), readValues[i].Value())
}
@ -1553,28 +1547,27 @@ func TestTSMReader_References(t *testing.T) {
t.Fatalf("unexpected error creating writer: %v", err)
}
var data = []struct {
key string
values []tsm1.Value
}{
{"float", []tsm1.Value{
var data = map[string][]tsm1.Value{
"float": []tsm1.Value{
tsm1.NewValue(1, 1.0)},
},
{"int", []tsm1.Value{
"int": []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
"uint": []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
"bool": []tsm1.Value{
tsm1.NewValue(1, true)},
},
{"string", []tsm1.Value{
"string": []tsm1.Value{
tsm1.NewValue(1, "foo")},
},
}
for _, d := range data {
if err := w.Write([]byte(d.key), d.values); err != nil {
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
@ -1609,17 +1602,17 @@ func TestTSMReader_References(t *testing.T) {
}
var count int
for _, d := range data {
readValues, err := r.Read([]byte(d.key), d.values[0].UnixNano())
for k, vals := range data {
readValues, err := r.Read([]byte(k), vals[0].UnixNano())
if err != nil {
t.Fatalf("unexpected error readin: %v", err)
}
if exp, got := len(d.values), len(readValues); exp != got {
if exp, got := len(vals), len(readValues); exp != got {
t.Fatalf("read values length mismatch: exp %v, got %v", exp, len(readValues))
}
for i, v := range d.values {
for i, v := range vals {
if v.Value() != readValues[i].Value() {
t.Fatalf("read value mismatch(%d): exp %v, got %d", i, v.Value(), readValues[i].Value())
}

View File

@ -104,7 +104,7 @@ func (r *ring) entry(key []byte) *entry {
// write writes values to the entry in the ring's partition associated with key.
// If no entry exists for the key then one will be created.
// write is safe for use by multiple goroutines.
func (r *ring) write(key []byte, values Values) error {
func (r *ring) write(key []byte, values Values) (bool, error) {
return r.getPartition(key).write(key, values)
}
@ -222,13 +222,13 @@ func (p *partition) entry(key []byte) *entry {
// write writes the values to the entry in the partition, creating the entry
// if it does not exist.
// write is safe for use by multiple goroutines.
func (p *partition) write(key []byte, values Values) error {
func (p *partition) write(key []byte, values Values) (bool, error) {
p.mu.RLock()
e := p.store[string(key)]
p.mu.RUnlock()
if e != nil {
// Hot path.
return e.add(values)
return false, e.add(values)
}
p.mu.Lock()
@ -236,18 +236,18 @@ func (p *partition) write(key []byte, values Values) error {
// Check again.
if e = p.store[string(key)]; e != nil {
return e.add(values)
return false, e.add(values)
}
// Create a new entry using a preallocated size if we have a hint available.
hint, _ := p.entrySizeHints[xxhash.Sum64(key)]
e, err := newEntryValues(values, hint)
if err != nil {
return err
return false, err
}
p.store[string(key)] = e
return nil
return true, nil
}
// add adds a new entry for key to the partition.

View File

@ -94,7 +94,7 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) {
go func() {
defer wg.Done()
for j := 0; j < n; j++ {
if err := r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", j)), Values{}); err != nil {
if _, err := r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", j)), Values{}); err != nil {
errC <- err
}
}

View File

@ -71,10 +71,8 @@ import (
"io"
"os"
"sort"
"sync"
"strings"
"time"
"github.com/influxdata/influxdb/pkg/bytesutil"
)
const (
@ -168,6 +166,8 @@ type IndexWriter interface {
// WriteTo writes the index contents to a writer.
WriteTo(w io.Writer) (int64, error)
Close() error
}
// IndexEntry is the index information for a given block in a TSM file.
@ -232,65 +232,123 @@ func (e *IndexEntry) String() string {
// NewIndexWriter returns a new IndexWriter.
func NewIndexWriter() IndexWriter {
return &directIndex{
blocks: map[string]*indexEntries{},
}
return &directIndex{}
}
// NewIndexWriter returns a new IndexWriter.
func NewDiskIndexWriter(f *os.File) IndexWriter {
return &directIndex{fd: f, w: bufio.NewWriter(f)}
}
// indexBlock represent an index information for a series within a TSM file.
type indexBlock struct {
key []byte
entries *indexEntries
}
// directIndex is a simple in-memory index implementation for a TSM file. The full index
// must fit in memory.
type directIndex struct {
mu sync.RWMutex
size uint32
blocks map[string]*indexEntries
blocks []indexBlock
fd *os.File
w *bufio.Writer
}
func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
d.mu.Lock()
defer d.mu.Unlock()
entries := d.blocks[string(key)]
if entries == nil {
entries = &indexEntries{
Type: blockType,
}
d.blocks[string(key)] = entries
// Is this the first block being added?
if len(d.blocks) == 0 {
// size of the key stored in the index
d.size += uint32(2 + len(key))
// size of the count of entries stored in the index
d.size += indexCountSize
}
entries.entries = append(entries.entries, IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
Offset: offset,
Size: size,
})
// size of the encoded index entry
d.size += indexEntrySize
d.blocks = append(d.blocks, indexBlock{
key: key,
entries: &indexEntries{
Type: blockType,
entries: []IndexEntry{IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
Offset: offset,
Size: size,
}}},
})
// size of the encoded index entry
d.size += indexEntrySize
return
}
// Find the last block so we can see if were still adding to the same series key.
block := d.blocks[len(d.blocks)-1]
cmp := bytes.Compare(block.key, key)
if cmp == 0 {
// The last block is still this key
block.entries.entries = append(block.entries.entries, IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
Offset: offset,
Size: size,
})
// size of the encoded index entry
d.size += indexEntrySize
} else if cmp < 0 {
if d.w != nil {
d.flush(d.w)
}
// We have a new key that is greater than the last one so we need to add
// a new index block section.
// size of the key stored in the index
d.size += uint32(2 + len(key))
// size of the count of entries stored in the index
d.size += indexCountSize
d.blocks = append(d.blocks, indexBlock{
key: key,
entries: &indexEntries{
Type: blockType,
entries: []IndexEntry{IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
Offset: offset,
Size: size,
}}},
})
// size of the encoded index entry
d.size += indexEntrySize
} else {
// Keys can't be added out of order.
panic(fmt.Sprintf("keys must be added in sorted order: %s < %s", string(key), string(d.blocks[len(d.blocks)-1].key)))
}
}
func (d *directIndex) entries(key []byte) []IndexEntry {
entries := d.blocks[string(key)]
if entries == nil {
if len(d.blocks) == 0 {
return nil
}
return entries.entries
if bytes.Equal(d.blocks[len(d.blocks)-1].key, key) {
return d.blocks[len(d.blocks)-1].entries.entries
}
i := sort.Search(len(d.blocks), func(i int) bool { return bytes.Compare(d.blocks[i].key, key) >= 0 })
if i < len(d.blocks) && bytes.Equal(d.blocks[i].key, key) {
return d.blocks[i].entries.entries
}
return nil
}
func (d *directIndex) Entries(key []byte) []IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
return d.entries(key)
}
func (d *directIndex) Entry(key []byte, t int64) *IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
entries := d.entries(key)
for _, entry := range entries {
if entry.Contains(t) {
@ -301,44 +359,38 @@ func (d *directIndex) Entry(key []byte, t int64) *IndexEntry {
}
func (d *directIndex) Keys() [][]byte {
d.mu.RLock()
defer d.mu.RUnlock()
keys := make([][]byte, 0, len(d.blocks))
for k := range d.blocks {
keys = append(keys, []byte(k))
for _, v := range d.blocks {
keys = append(keys, v.key)
}
bytesutil.Sort(keys)
return keys
}
func (d *directIndex) KeyCount() int {
d.mu.RLock()
n := len(d.blocks)
d.mu.RUnlock()
return n
}
func (d *directIndex) addEntries(key string, entries *indexEntries) {
existing := d.blocks[key]
if existing == nil {
d.blocks[key] = entries
return
}
existing.entries = append(existing.entries, entries.entries...)
return len(d.blocks)
}
func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// Index blocks are writtens sorted by key
keys := make([]string, 0, len(d.blocks))
for k := range d.blocks {
keys = append(keys, k)
if d.w == nil {
return d.flush(w)
}
sort.Strings(keys)
if _, err := d.flush(d.w); err != nil {
return 0, err
}
if err := d.w.Flush(); err != nil {
return 0, err
}
if _, err := d.fd.Seek(0, io.SeekStart); err != nil {
return 0, err
}
return io.Copy(w, bufio.NewReader(d.fd))
}
func (d *directIndex) flush(w io.Writer) (int64, error) {
var (
n int
err error
@ -347,13 +399,17 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
)
// For each key, individual entries are sorted by time
for _, key := range keys {
entries := d.blocks[key]
for _, ie := range d.blocks {
key := ie.key
entries := ie.entries
if entries.Len() > maxIndexEntries {
return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries)
}
sort.Sort(entries)
if !sort.IsSorted(entries) {
sort.Sort(entries)
}
binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
buf[2] = entries.Type
@ -365,7 +421,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
}
N += int64(n)
if n, err = io.WriteString(w, key); err != nil {
if n, err = w.Write(key); err != nil {
return int64(n) + N, fmt.Errorf("write: writer key error: %v", err)
}
N += int64(n)
@ -384,7 +440,11 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
N += n64
}
d.blocks = d.blocks[:0]
return N, nil
}
func (d *directIndex) MarshalBinary() ([]byte, error) {
@ -395,36 +455,25 @@ func (d *directIndex) MarshalBinary() ([]byte, error) {
return b.Bytes(), nil
}
func (d *directIndex) UnmarshalBinary(b []byte) error {
d.mu.Lock()
defer d.mu.Unlock()
d.size = uint32(len(b))
var pos int
for pos < len(b) {
n, key, err := readKey(b[pos:])
if err != nil {
return fmt.Errorf("readIndex: read key error: %v", err)
}
pos += n
var entries indexEntries
n, err = readEntries(b[pos:], &entries)
if err != nil {
return fmt.Errorf("readIndex: read entries error: %v", err)
}
pos += n
d.addEntries(string(key), &entries)
}
return nil
}
func (d *directIndex) Size() uint32 {
return d.size
}
func (d *directIndex) Close() error {
if d.w == nil {
return nil
}
// Flush anything remaining in the index
if err := d.w.Flush(); err != nil {
return err
}
if err := d.fd.Close(); err != nil {
return nil
}
return os.Remove(d.fd.Name())
}
// tsmWriter writes keys and values in the TSM format
type tsmWriter struct {
wrapped io.Writer
@ -435,8 +484,15 @@ type tsmWriter struct {
// NewTSMWriter returns a new TSMWriter writing to w.
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
index := &directIndex{
blocks: map[string]*indexEntries{},
var index IndexWriter
if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") {
f, err := os.Create(strings.TrimSuffix(fw.Name(), ".tsm.tmp") + ".idx.tmp")
if err != nil {
return nil, err
}
index = NewDiskIndexWriter(f)
} else {
index = NewIndexWriter()
}
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
@ -597,6 +653,10 @@ func (t *tsmWriter) Close() error {
return err
}
if err := t.index.Close(); err != nil {
return err
}
if c, ok := t.wrapped.(io.Closer); ok {
return c.Close()
}

View File

@ -241,74 +241,6 @@ func TestTSMWriter_Write_MultipleKeyValues(t *testing.T) {
}
}
// Tests that writing keys in reverse is able to read them back.
func TestTSMWriter_Write_ReverseKeys(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
var data = []struct {
key string
values []tsm1.Value
}{
{"mem", []tsm1.Value{
tsm1.NewValue(0, 1.5),
tsm1.NewValue(1, 2.5)},
},
{"cpu", []tsm1.Value{
tsm1.NewValue(0, 1.0),
tsm1.NewValue(1, 2.0)},
},
}
for _, d := range data {
if err := w.Write([]byte(d.key), d.values); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
for _, d := range data {
readValues, err := r.ReadAll([]byte(d.key))
if err != nil {
t.Fatalf("unexpected error readin: %v", err)
}
if exp := len(d.values); exp != len(readValues) {
t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), exp)
}
for i, v := range d.values {
if v.Value() != readValues[i].Value() {
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value())
}
}
}
}
// Tests that writing keys in reverse is able to read them back.
func TestTSMWriter_Write_SameKey(t *testing.T) {
dir := MustTempDir()

View File

@ -24,7 +24,11 @@ type FileSet struct {
// NewFileSet returns a new instance of FileSet.
func NewFileSet(levels []CompactionLevel, files []File) (*FileSet, error) {
fs := &FileSet{levels: levels, files: files}
fs := &FileSet{
levels: levels,
files: files,
filters: make([]*bloom.Filter, len(levels)),
}
if err := fs.buildFilters(); err != nil {
return nil, err
}
@ -56,9 +60,14 @@ func (fs *FileSet) Release() {
}
}
// Prepend returns a new file set with f added at the beginning.
func (fs *FileSet) Prepend(f File) (*FileSet, error) {
return NewFileSet(fs.levels, append([]File{f}, fs.files...))
// PrependLogFile returns a new file set with f added at the beginning.
// Filters do not need to be rebuilt because log files have no bloom filter.
func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet {
return &FileSet{
levels: fs.levels,
files: append([]File{f}, fs.files...),
filters: fs.filters,
}
}
// MustReplace swaps a list of files for a single file and returns a new file set.
@ -89,11 +98,26 @@ func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet {
other[i] = newFile
copy(other[i+1:], fs.files[i+len(oldFiles):])
fs, err := NewFileSet(fs.levels, other)
if err != nil {
// Copy existing bloom filters.
filters := make([]*bloom.Filter, len(fs.filters))
// copy(filters, fs.filters)
// Clear filters at replaced file levels.
filters[newFile.Level()] = nil
for _, f := range oldFiles {
filters[f.Level()] = nil
}
// Build new fileset and rebuild changed filters.
newFS := &FileSet{
levels: fs.levels,
files: other,
filters: filters,
}
if err := newFS.buildFilters(); err != nil {
panic("cannot build file set: " + err.Error())
}
return fs
return newFS
}
// MaxID returns the highest file identifier.
@ -702,6 +726,7 @@ func (fs *FileSet) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][
newTagsSlice = append(newTagsSlice, tags)
}
}
return newNames, newTagsSlice
}
@ -913,31 +938,38 @@ func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf
// buildFilters builds a series existence filter for each compaction level.
func (fs *FileSet) buildFilters() error {
if len(fs.levels) == 0 {
fs.filters = nil
return nil
}
// Generate filters for each level.
fs.filters = make([]*bloom.Filter, len(fs.levels))
// Move past log files (level=0).
files := fs.files
for len(files) > 0 && files[0].Level() == 0 {
files = files[1:]
}
// Merge filters at each level.
for _, f := range fs.files {
level := f.Level()
// Skip if file has no bloom filter.
if f.Filter() == nil {
// Build filters for each level where the filter is non-existent.
for level := range fs.levels {
// Clear filter if no files remain or next file is at a higher level.
if len(files) == 0 || files[0].Level() > level {
fs.filters[level] = nil
continue
}
// Initialize a filter if it doesn't exist.
if fs.filters[level] == nil {
lvl := fs.levels[level]
fs.filters[level] = bloom.NewFilter(lvl.M, lvl.K)
// Skip files at this level if filter already exists.
if fs.filters[level] != nil {
for len(files) > 0 && files[0].Level() == level {
files = files[1:]
}
continue
}
// Merge filter.
if err := fs.filters[level].Merge(f.Filter()); err != nil {
return err
// Build new filter from files at this level.
fs.filters[level] = bloom.NewFilter(fs.levels[level].M, fs.levels[level].K)
for len(files) > 0 && files[0].Level() == level {
if err := fs.filters[level].Merge(files[0].Filter()); err != nil {
return err
}
files = files[1:]
}
}

View File

@ -342,11 +342,7 @@ func (i *Index) prependActiveLogFile() error {
i.activeLogFile = f
// Prepend and generate new fileset.
fs, err := i.fileSet.Prepend(f)
if err != nil {
return err
}
i.fileSet = fs
i.fileSet = i.fileSet.PrependLogFile(f)
// Write new manifest.
if err := i.writeManifestFile(); err != nil {

View File

@ -189,17 +189,17 @@ func (f *LogFile) Release() { f.wg.Done() }
// Stat returns size and last modification time of the file.
func (f *LogFile) Stat() (int64, time.Time) {
f.mu.Lock()
f.mu.RLock()
size, modTime := f.size, f.modTime
f.mu.Unlock()
f.mu.RUnlock()
return size, modTime
}
// Size returns the size of the file, in bytes.
func (f *LogFile) Size() int64 {
f.mu.Lock()
f.mu.RLock()
v := f.size
f.mu.Unlock()
f.mu.RUnlock()
return v
}
@ -429,16 +429,55 @@ func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
// AddSeriesList adds a list of series to the log file in bulk.
func (f *LogFile) AddSeriesList(names [][]byte, tagsSlice []models.Tags) error {
// Determine total size of names, keys, values.
var n int
for i := range names {
n += len(names[i])
tags := tagsSlice[i]
for j := range tags {
n += len(tags[j].Key) + len(tags[j].Value)
}
}
// Allocate names, keys, & values in one block.
buf := make([]byte, n)
// Clone all entries.
entries := make([]LogEntry, len(names))
for i := range names {
copy(buf, names[i])
clonedName := buf[:len(names[i])]
buf = buf[len(names[i]):]
// Clone tag set.
var clonedTags models.Tags
if len(tagsSlice[i]) > 0 {
clonedTags = make(models.Tags, len(tagsSlice[i]))
for j, tags := range tagsSlice[i] {
copy(buf, tags.Key)
key := buf[:len(tags.Key)]
buf = buf[len(tags.Key):]
copy(buf, tags.Value)
value := buf[:len(tags.Value)]
buf = buf[len(tags.Value):]
clonedTags[j] = models.Tag{Key: key, Value: value}
}
}
entries[i] = LogEntry{Name: clonedName, Tags: clonedTags}
}
f.mu.Lock()
defer f.mu.Unlock()
for i := range names {
// The name and tags are clone to prevent a memory leak
e := LogEntry{Name: []byte(string(names[i])), Tags: tagsSlice[i].Clone()}
if err := f.appendEntry(&e); err != nil {
for i := range entries {
if err := f.appendEntry(&entries[i]); err != nil {
return err
}
f.execEntry(&e)
f.execEntry(&entries[i])
}
return nil
}

View File

@ -49,7 +49,7 @@ const (
)
// MaxSeriesBlockHashSize is the maximum number of series in a single hash.
const MaxSeriesBlockHashSize = (65536 * LoadFactor) / 100
const MaxSeriesBlockHashSize = (1048576 * LoadFactor) / 100
// SeriesBlock represents the section of the index that holds series data.
type SeriesBlock struct {

View File

@ -928,6 +928,12 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
}
s.mu.RUnlock()
// Ensure snapshot compactions are enabled since the shard might have been cold
// and disabled by the monitor.
if sh.IsIdle() {
sh.SetCompactionsEnabled(true)
}
return sh.WritePoints(points)
}