From 4124a8ed9700950eff10c90d8ec3031160522b6e Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 14 Sep 2017 13:11:40 -0600 Subject: [PATCH 01/14] Simplify cache ring The continuum slice is not needed since the number of partitions doesn't change. This removes the slice to make the mapping simpler. --- tsdb/engine/tsm1/ring.go | 19 ++++++------------- tsdb/engine/tsm1/ring_test.go | 2 +- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index 1e4bce54df..f962edbe0e 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -36,10 +36,6 @@ type ring struct { // len(partitions) <= len(continuum) partitions []*partition - // A mapping of partition to location on the ring continuum. This is used - // to lookup a partition. - continuum []*partition - // Number of keys within the ring. This is used to provide a hint for // allocating the return values in keys(). It will not be perfectly accurate // since it doesn't consider adding duplicate keys, or trying to remove non- @@ -59,20 +55,17 @@ func newring(n int) (*ring, error) { } r := ring{ - continuum: make([]*partition, partitions), // maximum number of partitions. + partitions: make([]*partition, n), // maximum number of partitions. } // The trick here is to map N partitions to all points on the continuum, // such that the first eight bits of a given hash will map directly to one // of the N partitions. - for i := 0; i < len(r.continuum); i++ { - if (i == 0 || i%(partitions/n) == 0) && len(r.partitions) < n { - r.partitions = append(r.partitions, &partition{ - store: make(map[string]*entry), - entrySizeHints: make(map[uint64]int), - }) + for i := 0; i < len(r.partitions); i++ { + r.partitions[i] = &partition{ + store: make(map[string]*entry), + entrySizeHints: make(map[uint64]int), } - r.continuum[i] = r.partitions[len(r.partitions)-1] } return &r, nil } @@ -92,7 +85,7 @@ func (r *ring) reset() { // getPartition retrieves the hash ring partition associated with the provided // key. func (r *ring) getPartition(key []byte) *partition { - return r.continuum[int(xxhash.Sum64(key)%partitions)] + return r.partitions[int(xxhash.Sum64(key)%partitions)] } // entry returns the entry for the given key. diff --git a/tsdb/engine/tsm1/ring_test.go b/tsdb/engine/tsm1/ring_test.go index c0fc13622d..868f79beeb 100644 --- a/tsdb/engine/tsm1/ring_test.go +++ b/tsdb/engine/tsm1/ring_test.go @@ -31,7 +31,7 @@ func TestRing_newRing(t *testing.T) { // Check partitions distributed correctly partitions := make([]*partition, 0) - for i, partition := range r.continuum { + for i, partition := range r.partitions { if i == 0 || partition != partitions[len(partitions)-1] { partitions = append(partitions, partition) } From 2885b9b310700f069d8b26896af1ad40481b241a Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 14 Sep 2017 14:09:16 -0600 Subject: [PATCH 02/14] Remove entrySizeHints map There is a lot of overhead for calculating the hints for larger cardinalities. This slows down resetting the partitions in the ring. --- tsdb/engine/tsm1/ring.go | 28 ++++------------------------ 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index f962edbe0e..244dd11ab3 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -63,8 +63,7 @@ func newring(n int) (*ring, error) { // of the N partitions. for i := 0; i < len(r.partitions); i++ { r.partitions[i] = &partition{ - store: make(map[string]*entry), - entrySizeHints: make(map[uint64]int), + store: make(map[string]*entry), } } return &r, nil @@ -195,12 +194,6 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error { type partition struct { mu sync.RWMutex store map[string]*entry - - // entrySizeHints stores hints for appropriate sizes to pre-allocate the - // []Values in an entry. entrySizeHints will only contain hints for entries - // that were present prior to the most recent snapshot, preventing unbounded - // growth over time. - entrySizeHints map[uint64]int } // entry returns the partition's entry for the provided key. @@ -233,8 +226,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) { } // Create a new entry using a preallocated size if we have a hint available. - hint, _ := p.entrySizeHints[xxhash.Sum64(key)] - e, err := newEntryValues(values, hint) + e, err := newEntryValues(values, 32) if err != nil { return false, err } @@ -274,19 +266,7 @@ func (p *partition) keys() [][]byte { func (p *partition) reset() { p.mu.Lock() defer p.mu.Unlock() - - // Collect the allocated sizes of values for each entry in the store. - p.entrySizeHints = make(map[uint64]int) - for k, entry := range p.store { - // If the capacity is large then there are many values in the entry. - // Store a hint to pre-allocate the next time we see the same entry. - entry.mu.RLock() - if cap(entry.values) > 128 { // 4 x the default entry capacity size. - p.entrySizeHints[xxhash.Sum64String(k)] = cap(entry.values) - } - entry.mu.RUnlock() + for k := range p.store { + delete(p.store, k) } - - // Reset the store. - p.store = make(map[string]*entry, len(p.store)) } From 9ee305f6f58255f5b59c3effbd65e480086d436d Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 14 Sep 2017 18:18:05 -0600 Subject: [PATCH 03/14] Periodically re-allocate cache store This perioically re-allocates the cache store to avoid memory fragmentation and gradual slow down of the store after repeated deletes and inserts into the map. --- tsdb/engine/tsm1/ring.go | 55 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index 244dd11ab3..b1afec6355 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -129,6 +129,14 @@ func (r *ring) keys(sorted bool) [][]byte { return keys } +func (r *ring) count() int { + var n int + for _, p := range r.partitions { + n += p.count() + } + return n +} + // apply applies the provided function to every entry in the ring under a read // lock using a separate goroutine for each partition. The provided function // will be called with each key and the corresponding entry. The first error @@ -180,6 +188,9 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error { for _, p := range r.partitions { p.mu.RLock() for k, e := range p.store { + if e.count() == 0 { + continue + } if err := f([]byte(k), e); err != nil { p.mu.RUnlock() return err @@ -190,6 +201,21 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error { return nil } +func (r *ring) split(n int) []storer { + var keys int + storers := make([]storer, n) + for i := 0; i < n; i++ { + storers[i], _ = newring(len(r.partitions)) + } + + for i, p := range r.partitions { + r := storers[i%n].(*ring) + r.partitions[i] = p + keys += len(p.store) + } + return storers +} + // partition provides safe access to a map of series keys to entries. type partition struct { mu sync.RWMutex @@ -254,7 +280,10 @@ func (p *partition) remove(key []byte) { func (p *partition) keys() [][]byte { p.mu.RLock() keys := make([][]byte, 0, len(p.store)) - for k := range p.store { + for k, v := range p.store { + if v.count() == 0 { + continue + } keys = append(keys, []byte(k)) } p.mu.RUnlock() @@ -264,9 +293,25 @@ func (p *partition) keys() [][]byte { // reset resets the partition by reinitialising the store. reset returns hints // about sizes that the entries within the store could be reallocated with. func (p *partition) reset() { + p.mu.RLock() + sz := len(p.store) + p.mu.RUnlock() + + newStore := make(map[string]*entry, sz) p.mu.Lock() - defer p.mu.Unlock() - for k := range p.store { - delete(p.store, k) - } + p.store = newStore + p.mu.Unlock() +} + +func (p *partition) count() int { + var n int + p.mu.RLock() + for _, v := range p.store { + if v.count() > 0 { + n++ + } + } + p.mu.RUnlock() + return n + } From ddeba2c86b502adb0c40330b307629aa8fa5bab8 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 18 Sep 2017 11:02:27 -0600 Subject: [PATCH 04/14] Split large snapshots and write concurrently --- tsdb/engine/tsm1/cache.go | 26 +++++++++++++++++++ tsdb/engine/tsm1/cache_test.go | 43 +++++++++++++++++++++++++++++++ tsdb/engine/tsm1/compact.go | 44 +++++++++++++++++++++++++++++--- tsdb/engine/tsm1/compact_test.go | 20 +++++++++++++-- tsdb/engine/tsm1/ring.go | 13 ++++++++++ 5 files changed, 141 insertions(+), 5 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 4e25a77f87..55e5e218a2 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -176,6 +176,8 @@ type storer interface { apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel. applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial. reset() // Reset the store to an initial unused state. + split(n int) []storer // Split splits the store into n stores + count() int // Count returns the number of keys in the store } // Cache maintains an in-memory store of Values for a set of keys. @@ -477,6 +479,13 @@ func (c *Cache) MaxSize() uint64 { return c.maxSize } +func (c *Cache) Count() int { + c.mu.RLock() + n := c.store.count() + c.mu.RUnlock() + return n +} + // Keys returns a sorted slice of all keys under management by the cache. func (c *Cache) Keys() [][]byte { c.mu.RLock() @@ -485,6 +494,21 @@ func (c *Cache) Keys() [][]byte { return store.keys(true) } +func (c *Cache) Split(n int) []*Cache { + if n == 1 { + return []*Cache{c} + } + + caches := make([]*Cache, n) + storers := c.store.split(n) + for i := 0; i < n; i++ { + caches[i] = &Cache{ + store: storers[i], + } + } + return caches +} + // unsortedKeys returns a slice of all keys under management by the cache. The // keys are not sorted. func (c *Cache) unsortedKeys() [][]byte { @@ -765,3 +789,5 @@ func (e emptyStore) keys(sorted bool) [][]byte { return nil func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil } func (e emptyStore) applySerial(f func([]byte, *entry) error) error { return nil } func (e emptyStore) reset() {} +func (e emptyStore) split(n int) []storer { return nil } +func (e emptyStore) count() int { return 0 } diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index b890bd0db1..b47e2920ad 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -759,6 +759,45 @@ func TestCacheLoader_LoadDeleted(t *testing.T) { } } +func TestCache_Split(t *testing.T) { + v0 := NewValue(1, 1.0) + v1 := NewValue(2, 2.0) + v2 := NewValue(3, 3.0) + values := Values{v0, v1, v2} + valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) + + c := NewCache(0, "") + + if err := c.Write([]byte("foo"), values); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + if err := c.Write([]byte("bar"), values); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + + if err := c.Write([]byte("baz"), values); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + + if n := c.Size(); n != 3*valuesSize+9 { + t.Fatalf("cache size incorrect after 3 writes, exp %d, got %d", 3*valuesSize*9, n) + } + + splits := c.Split(3) + keys := make(map[string]int) + for _, s := range splits { + for _, k := range s.Keys() { + keys[string(k)] = s.Values(k).Size() + } + } + + for _, key := range []string{"foo", "bar", "baz"} { + if _, ok := keys[key]; !ok { + t.Fatalf("missing key, exp %s, got %v", key, nil) + } + } +} + func mustTempDir() string { dir, err := ioutil.TempDir("", "tsm1-test") if err != nil { @@ -797,6 +836,8 @@ type TestStore struct { applyf func(f func([]byte, *entry) error) error applySerialf func(f func([]byte, *entry) error) error resetf func() + splitf func(n int) []storer + countf func() int } func NewTestStore() *TestStore { return &TestStore{} } @@ -808,6 +849,8 @@ func (s *TestStore) keys(sorted bool) [][]byte { return s.k func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) } func (s *TestStore) applySerial(f func([]byte, *entry) error) error { return s.applySerialf(f) } func (s *TestStore) reset() { s.resetf() } +func (s *TestStore) split(n int) []storer { return s.splitf(n) } +func (s *TestStore) count() int { return s.countf() } var fvSize = uint64(NewValue(1, float64(1)).Size()) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 82e5f14b68..b477066661 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -227,9 +227,16 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { minGenerations = level + 1 } + // Each compaction group should run against 4 generations. For level 1, since these + // can get created much more quickly, bump the grouping to 8 to keep file counts lower. + groupSize := 4 + if level == 1 { + groupSize = 8 + } + var cGroups []CompactionGroup for _, group := range levelGroups { - for _, chunk := range group.chunk(4) { + for _, chunk := range group.chunk(groupSize) { var cGroup CompactionGroup var hasTombstones bool for _, gen := range chunk { @@ -697,8 +704,39 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { return nil, errSnapshotsDisabled } - iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC) - files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) + concurrency := cache.Count() / 256000 + if concurrency < 1 { + concurrency = 1 + } + if concurrency > 4 { + concurrency = 4 + } + splits := cache.Split(concurrency) + + type res struct { + files []string + err error + } + + resC := make(chan res, concurrency) + for i := 0; i < concurrency; i++ { + go func(sp *Cache) { + iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC) + files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) + resC <- res{files: files, err: err} + + }(splits[i]) + } + + var err error + files := make([]string, 0, concurrency) + for i := 0; i < concurrency; i++ { + result := <-resC + if result.err != nil { + err = result.err + } + files = append(files, result.files...) + } // See if we were disabled while writing a snapshot c.mu.RLock() diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index cf45c39c87..5f0e8f210e 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1689,6 +1689,22 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { Path: "06-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "07-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "08-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "09-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "10-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1699,8 +1715,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[4], data[5]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} + expFiles2 := []tsm1.FileStat{data[8], data[9]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index b1afec6355..af82a18c1f 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -315,3 +315,16 @@ func (p *partition) count() int { return n } + +func (p *partition) count() int { + var n int + p.mu.RLock() + for _, v := range p.store { + if v.count() > 0 { + n++ + } + } + p.mu.RUnlock() + return n + +} From 2ca9ccee1f07d5ce33d85027be50ac6391aee897 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 18 Sep 2017 11:03:31 -0600 Subject: [PATCH 05/14] Reset snapshot cache outside of write lock --- tsdb/engine/tsm1/cache.go | 12 ++++++++++-- tsdb/engine/tsm1/ring.go | 13 ------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 55e5e218a2..39b1784508 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -438,6 +438,15 @@ func (c *Cache) Deduplicate() { func (c *Cache) ClearSnapshot(success bool) { c.init() + c.mu.RLock() + snapStore := c.snapshot.store + c.mu.RUnlock() + + // reset the snapshot store outside of the write lock + if success { + snapStore.reset() + } + c.mu.Lock() defer c.mu.Unlock() @@ -447,8 +456,7 @@ func (c *Cache) ClearSnapshot(success bool) { c.snapshotAttempts = 0 c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache - // Reset the snapshot's store, and reset the snapshot to a fresh Cache. - c.snapshot.store.reset() + // Reset the snapshot to a fresh Cache. c.snapshot = &Cache{ store: c.snapshot.store, } diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index af82a18c1f..b1afec6355 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -315,16 +315,3 @@ func (p *partition) count() int { return n } - -func (p *partition) count() int { - var n int - p.mu.RLock() - for _, v := range p.store { - if v.count() > 0 { - n++ - } - } - p.mu.RUnlock() - return n - -} From 31e785d676a62f97523da4dd901eb8b1957a877e Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 19 Sep 2017 12:03:35 -0600 Subject: [PATCH 06/14] Don't deduplicate a single value --- tsdb/engine/tsm1/cache.go | 2 +- tsdb/engine/tsm1/encoding.gen.go | 12 ++++++------ tsdb/engine/tsm1/encoding.gen.go.tmpl | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 39b1784508..7f3b155c87 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -118,7 +118,7 @@ func (e *entry) deduplicate() { e.mu.Lock() defer e.mu.Unlock() - if len(e.values) == 0 { + if len(e.values) <= 1 { return } e.values = e.values.Deduplicate() diff --git a/tsdb/engine/tsm1/encoding.gen.go b/tsdb/engine/tsm1/encoding.gen.go index 62716d697f..3b7e187807 100644 --- a/tsdb/engine/tsm1/encoding.gen.go +++ b/tsdb/engine/tsm1/encoding.gen.go @@ -56,7 +56,7 @@ func (a Values) assertOrdered() { // Deduplicate returns a new slice with any values that have the same timestamp removed. // The Value that appears last in the slice is the one that is kept. func (a Values) Deduplicate() Values { - if len(a) == 0 { + if len(a) <= 1 { return a } @@ -268,7 +268,7 @@ func (a FloatValues) assertOrdered() { // Deduplicate returns a new slice with any values that have the same timestamp removed. // The Value that appears last in the slice is the one that is kept. func (a FloatValues) Deduplicate() FloatValues { - if len(a) == 0 { + if len(a) <= 1 { return a } @@ -524,7 +524,7 @@ func (a IntegerValues) assertOrdered() { // Deduplicate returns a new slice with any values that have the same timestamp removed. // The Value that appears last in the slice is the one that is kept. func (a IntegerValues) Deduplicate() IntegerValues { - if len(a) == 0 { + if len(a) <= 1 { return a } @@ -780,7 +780,7 @@ func (a UnsignedValues) assertOrdered() { // Deduplicate returns a new slice with any values that have the same timestamp removed. // The Value that appears last in the slice is the one that is kept. func (a UnsignedValues) Deduplicate() UnsignedValues { - if len(a) == 0 { + if len(a) <= 1 { return a } @@ -1036,7 +1036,7 @@ func (a StringValues) assertOrdered() { // Deduplicate returns a new slice with any values that have the same timestamp removed. // The Value that appears last in the slice is the one that is kept. func (a StringValues) Deduplicate() StringValues { - if len(a) == 0 { + if len(a) <= 1 { return a } @@ -1292,7 +1292,7 @@ func (a BooleanValues) assertOrdered() { // Deduplicate returns a new slice with any values that have the same timestamp removed. // The Value that appears last in the slice is the one that is kept. func (a BooleanValues) Deduplicate() BooleanValues { - if len(a) == 0 { + if len(a) <= 1 { return a } diff --git a/tsdb/engine/tsm1/encoding.gen.go.tmpl b/tsdb/engine/tsm1/encoding.gen.go.tmpl index 36954d1db3..89db2800b8 100644 --- a/tsdb/engine/tsm1/encoding.gen.go.tmpl +++ b/tsdb/engine/tsm1/encoding.gen.go.tmpl @@ -53,7 +53,7 @@ func (a {{.Name}}Values) assertOrdered() { // Deduplicate returns a new slice with any values that have the same timestamp removed. // The Value that appears last in the slice is the one that is kept. func (a {{.Name}}Values) Deduplicate() {{.Name}}Values { - if len(a) == 0 { + if len(a) <= 1 { return a } From 4fe81aeee6797a7c336c0aa812bde904a68ae297 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 19 Sep 2017 12:34:48 -0600 Subject: [PATCH 07/14] Remove manual Gosched from compactions At higher cardinalities, this dramatically slows down compaction throughput. --- tsdb/engine/tsm1/compact.gen.go | 54 ---------------------------- tsdb/engine/tsm1/compact.gen.go.tmpl | 14 -------- 2 files changed, 68 deletions(-) diff --git a/tsdb/engine/tsm1/compact.gen.go b/tsdb/engine/tsm1/compact.gen.go index 748a694ffc..24d4b8df0d 100644 --- a/tsdb/engine/tsm1/compact.gen.go +++ b/tsdb/engine/tsm1/compact.gen.go @@ -6,10 +6,6 @@ package tsm1 -import ( - "runtime" -) - // merge combines the next set of blocks into merged blocks. func (k *tsmKeyIterator) mergeFloat() { // No blocks left, or pending merged values, we're done @@ -92,10 +88,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks { } k.mergedFloatValues = k.mergedFloatValues.Merge(v) - - // Allow other goroutines to run - runtime.Gosched() - } } @@ -120,8 +112,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks { break } i++ - // Allow other goroutines to run - runtime.Gosched() } if k.fast { @@ -134,8 +124,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks { chunked = append(chunked, k.blocks[i]) i++ - // Allow other goroutines to run - runtime.Gosched() } } @@ -170,8 +158,6 @@ func (k *tsmKeyIterator) combineFloat(dedup bool) blocks { k.mergedFloatValues = k.mergedFloatValues.Merge(v) i++ - // Allow other goroutines to run - runtime.Gosched() } k.blocks = k.blocks[i:] @@ -300,10 +286,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks { } k.mergedIntegerValues = k.mergedIntegerValues.Merge(v) - - // Allow other goroutines to run - runtime.Gosched() - } } @@ -328,8 +310,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks { break } i++ - // Allow other goroutines to run - runtime.Gosched() } if k.fast { @@ -342,8 +322,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks { chunked = append(chunked, k.blocks[i]) i++ - // Allow other goroutines to run - runtime.Gosched() } } @@ -378,8 +356,6 @@ func (k *tsmKeyIterator) combineInteger(dedup bool) blocks { k.mergedIntegerValues = k.mergedIntegerValues.Merge(v) i++ - // Allow other goroutines to run - runtime.Gosched() } k.blocks = k.blocks[i:] @@ -508,10 +484,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks { } k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v) - - // Allow other goroutines to run - runtime.Gosched() - } } @@ -536,8 +508,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks { break } i++ - // Allow other goroutines to run - runtime.Gosched() } if k.fast { @@ -550,8 +520,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks { chunked = append(chunked, k.blocks[i]) i++ - // Allow other goroutines to run - runtime.Gosched() } } @@ -586,8 +554,6 @@ func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks { k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v) i++ - // Allow other goroutines to run - runtime.Gosched() } k.blocks = k.blocks[i:] @@ -716,10 +682,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks { } k.mergedStringValues = k.mergedStringValues.Merge(v) - - // Allow other goroutines to run - runtime.Gosched() - } } @@ -744,8 +706,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks { break } i++ - // Allow other goroutines to run - runtime.Gosched() } if k.fast { @@ -758,8 +718,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks { chunked = append(chunked, k.blocks[i]) i++ - // Allow other goroutines to run - runtime.Gosched() } } @@ -794,8 +752,6 @@ func (k *tsmKeyIterator) combineString(dedup bool) blocks { k.mergedStringValues = k.mergedStringValues.Merge(v) i++ - // Allow other goroutines to run - runtime.Gosched() } k.blocks = k.blocks[i:] @@ -924,10 +880,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks { } k.mergedBooleanValues = k.mergedBooleanValues.Merge(v) - - // Allow other goroutines to run - runtime.Gosched() - } } @@ -952,8 +904,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks { break } i++ - // Allow other goroutines to run - runtime.Gosched() } if k.fast { @@ -966,8 +916,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks { chunked = append(chunked, k.blocks[i]) i++ - // Allow other goroutines to run - runtime.Gosched() } } @@ -1002,8 +950,6 @@ func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks { k.mergedBooleanValues = k.mergedBooleanValues.Merge(v) i++ - // Allow other goroutines to run - runtime.Gosched() } k.blocks = k.blocks[i:] diff --git a/tsdb/engine/tsm1/compact.gen.go.tmpl b/tsdb/engine/tsm1/compact.gen.go.tmpl index 9555d03c0d..42f3ab4f51 100644 --- a/tsdb/engine/tsm1/compact.gen.go.tmpl +++ b/tsdb/engine/tsm1/compact.gen.go.tmpl @@ -1,9 +1,5 @@ package tsm1 -import ( - "runtime" -) - {{range .}} // merge combines the next set of blocks into merged blocks. @@ -88,10 +84,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks { } k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v) - - // Allow other goroutines to run - runtime.Gosched() - } } @@ -116,8 +108,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks { break } i++ - // Allow other goroutines to run - runtime.Gosched() } if k.fast { @@ -130,8 +120,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks { chunked = append(chunked, k.blocks[i]) i++ - // Allow other goroutines to run - runtime.Gosched() } } @@ -166,8 +154,6 @@ func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks { k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v) i++ - // Allow other goroutines to run - runtime.Gosched() } k.blocks = k.blocks[i:] From 0d52b060df923f13582ba93feccfef0bd00af549 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 19 Sep 2017 13:02:52 -0600 Subject: [PATCH 08/14] Skip onFileStoreReplace with tsi --- tsdb/engine/tsm1/engine.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index ee92aa5dc1..375fd9013b 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -26,6 +26,7 @@ import ( "github.com/influxdata/influxdb/tsdb" _ "github.com/influxdata/influxdb/tsdb/index" "github.com/influxdata/influxdb/tsdb/index/inmem" + "github.com/influxdata/influxdb/tsdb/index/tsi1" "github.com/uber-go/zap" ) @@ -1289,6 +1290,10 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { // onFileStoreReplace is callback handler invoked when the FileStore // has replaced one set of TSM files with a new set. func (e *Engine) onFileStoreReplace(newFiles []TSMFile) { + if e.index.Type() == tsi1.IndexName { + return + } + // Load any new series keys to the index readers := make([]chan seriesKey, 0, len(newFiles)) for _, r := range newFiles { From 391a6288c671f407cab99b19b2d7a06bd1293815 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 19 Sep 2017 13:03:49 -0600 Subject: [PATCH 09/14] Write parallel snapshot for higher cardinalities --- tsdb/engine/tsm1/compact.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index b477066661..b89c8cb937 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -704,12 +704,16 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { return nil, errSnapshotsDisabled } - concurrency := cache.Count() / 256000 - if concurrency < 1 { - concurrency = 1 - } - if concurrency > 4 { - concurrency = 4 + concurrency := 1 + card := cache.Count() + if card >= 1024*1024 { + concurrency = card / 1024 * 1024 + if concurrency < 1 { + concurrency = 1 + } + if concurrency > 4 { + concurrency = 4 + } } splits := cache.Split(concurrency) From 796de3dceaeb957f97e22c188831270884958ee2 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 19 Sep 2017 14:26:13 -0600 Subject: [PATCH 10/14] Reduce encoder pool checkout contention With higher cardinalities, the encoder pools where become a bottleneck. This changes the snapshot compactions ot checkout one encoder of each type and re-use it while writing the snapshots as opposed to repeatedly checking it out and in. --- tsdb/engine/tsm1/compact.go | 105 ++++++++----- tsdb/engine/tsm1/encoding.go | 277 ++++++++++++++++++----------------- 2 files changed, 213 insertions(+), 169 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index b89c8cb937..c378fe2a8f 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1379,56 +1379,87 @@ func (c *cacheKeyIterator) encode() { n := len(c.ready) // Divide the keyset across each CPU - chunkSize := 128 + chunkSize := 1 idx := uint64(0) + for i := 0; i < concurrency; i++ { // Run one goroutine per CPU and encode a section of the key space concurrently go func() { + tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock) + fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock) + benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock) + uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock) + senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock) + ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock) + + defer putTimeEncoder(tenc) + defer putFloatEncoder(fenc) + defer putBooleanEncoder(benc) + defer putUnsignedEncoder(uenc) + defer putStringEncoder(senc) + defer putIntegerEncoder(ienc) + for { - start := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize - if start >= n { + i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize + + if i >= n { break } - end := start + chunkSize - if end > n { - end = n + + key := c.order[i] + values := c.cache.values(key) + + for len(values) > 0 { + + end := len(values) + if end > c.size { + end = c.size + } + + minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano() + var b []byte + var err error + tenc.Reset() + + maxTime = values[end-1].UnixNano() + + switch values[0].(type) { + case FloatValue: + fenc.Reset() + b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc) + case IntegerValue: + ienc.Reset() + b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc) + case UnsignedValue: + uenc.Reset() + b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc) + case BooleanValue: + benc.Reset() + b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc) + case StringValue: + senc.Reset() + b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc) + default: + b, err = Values(values[:end]).Encode(nil) + } + + values = values[end:] + + c.blocks[i] = append(c.blocks[i], cacheBlock{ + k: key, + minTime: minTime, + maxTime: maxTime, + b: b, + err: err, + }) } - c.encodeRange(start, end) + // Notify this key is fully encoded + c.ready[i] <- struct{}{} } }() } } -func (c *cacheKeyIterator) encodeRange(start, stop int) { - for i := start; i < stop; i++ { - key := c.order[i] - values := c.cache.values(key) - - for len(values) > 0 { - minTime, maxTime := values[0].UnixNano(), values[len(values)-1].UnixNano() - var b []byte - var err error - if len(values) > c.size { - maxTime = values[c.size-1].UnixNano() - b, err = Values(values[:c.size]).Encode(nil) - values = values[c.size:] - } else { - b, err = Values(values).Encode(nil) - values = values[:0] - } - c.blocks[i] = append(c.blocks[i], cacheBlock{ - k: key, - minTime: minTime, - maxTime: maxTime, - b: b, - err: err, - }) - } - // Notify this key is fully encoded - c.ready[i] <- struct{}{} - } -} - func (c *cacheKeyIterator) Next() bool { if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 { c.blocks[c.i] = c.blocks[c.i][1:] diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index d6a751c14e..4b2953be40 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -359,32 +359,7 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { // frame-or-reference and run length encoding. tsenc := getTimeEncoder(len(values)) - var b []byte - err := func() error { - for _, v := range values { - vv := v.(FloatValue) - tsenc.Write(vv.unixnano) - venc.Write(vv.value) - } - venc.Flush() - - // Encoded timestamp values - tb, err := tsenc.Bytes() - if err != nil { - return err - } - // Encoded float values - vb, err := venc.Bytes() - if err != nil { - return err - } - - // Prepend the first timestamp of the block in the first 8 bytes and the block - // in the next byte, followed by the block - b = packBlock(buf, BlockFloat64, tb, vb) - - return nil - }() + b, err := encodeFloatBlockUsing(buf, values, tsenc, venc) putTimeEncoder(tsenc) putFloatEncoder(venc) @@ -392,6 +367,33 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { return b, err } +func encodeFloatBlockUsing(buf []byte, values []Value, tsenc TimeEncoder, venc *FloatEncoder) ([]byte, error) { + tsenc.Reset() + venc.Reset() + + for _, v := range values { + vv := v.(FloatValue) + tsenc.Write(vv.unixnano) + venc.Write(vv.value) + } + venc.Flush() + + // Encoded timestamp values + tb, err := tsenc.Bytes() + if err != nil { + return nil, err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return nil, err + } + + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + return packBlock(buf, BlockFloat64, tb, vb), nil +} + // DecodeFloatBlock decodes the float block from the byte slice // and appends the float values to a. func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) { @@ -499,30 +501,7 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { // Encode timestamps using an adaptive encoder tsenc := getTimeEncoder(len(values)) - var b []byte - err := func() error { - for _, v := range values { - vv := v.(BooleanValue) - tsenc.Write(vv.unixnano) - venc.Write(vv.value) - } - - // Encoded timestamp values - tb, err := tsenc.Bytes() - if err != nil { - return err - } - // Encoded float values - vb, err := venc.Bytes() - if err != nil { - return err - } - - // Prepend the first timestamp of the block in the first 8 bytes and the block - // in the next byte, followed by the block - b = packBlock(buf, BlockBoolean, tb, vb) - return nil - }() + b, err := encodeBooleanBlockUsing(buf, values, tsenc, venc) putTimeEncoder(tsenc) putBooleanEncoder(venc) @@ -530,6 +509,32 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { return b, err } +func encodeBooleanBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc BooleanEncoder) ([]byte, error) { + tenc.Reset() + venc.Reset() + + for _, v := range values { + vv := v.(BooleanValue) + tenc.Write(vv.unixnano) + venc.Write(vv.value) + } + + // Encoded timestamp values + tb, err := tenc.Bytes() + if err != nil { + return nil, err + } + // Encoded float values + vb, err := venc.Bytes() + if err != nil { + return nil, err + } + + // Prepend the first timestamp of the block in the first 8 bytes and the block + // in the next byte, followed by the block + return packBlock(buf, BlockBoolean, tb, vb), nil +} + // DecodeBooleanBlock decodes the boolean block from the byte slice // and appends the boolean values to a. func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) { @@ -622,39 +627,42 @@ func (v IntegerValue) String() string { } func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { - tsEnc := getTimeEncoder(len(values)) - vEnc := getIntegerEncoder(len(values)) + tenc := getTimeEncoder(len(values)) + venc := getIntegerEncoder(len(values)) - var b []byte - err := func() error { - for _, v := range values { - vv := v.(IntegerValue) - tsEnc.Write(vv.unixnano) - vEnc.Write(vv.value) - } + b, err := encodeIntegerBlockUsing(buf, values, tenc, venc) - // Encoded timestamp values - tb, err := tsEnc.Bytes() - if err != nil { - return err - } - // Encoded int64 values - vb, err := vEnc.Bytes() - if err != nil { - return err - } - - // Prepend the first timestamp of the block in the first 8 bytes - b = packBlock(buf, BlockInteger, tb, vb) - return nil - }() - - putTimeEncoder(tsEnc) - putIntegerEncoder(vEnc) + putTimeEncoder(tenc) + putIntegerEncoder(venc) return b, err } +func encodeIntegerBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc IntegerEncoder) ([]byte, error) { + tenc.Reset() + venc.Reset() + + for _, v := range values { + vv := v.(IntegerValue) + tenc.Write(vv.unixnano) + venc.Write(vv.value) + } + + // Encoded timestamp values + tb, err := tenc.Bytes() + if err != nil { + return nil, err + } + // Encoded int64 values + vb, err := venc.Bytes() + if err != nil { + return nil, err + } + + // Prepend the first timestamp of the block in the first 8 bytes + return packBlock(buf, BlockInteger, tb, vb), nil +} + // DecodeIntegerBlock decodes the integer block from the byte slice // and appends the integer values to a. func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) { @@ -748,39 +756,42 @@ func (v UnsignedValue) String() string { } func encodeUnsignedBlock(buf []byte, values []Value) ([]byte, error) { - tsEnc := getTimeEncoder(len(values)) - vEnc := getUnsignedEncoder(len(values)) + tenc := getTimeEncoder(len(values)) + venc := getUnsignedEncoder(len(values)) - var b []byte - err := func() error { - for _, v := range values { - vv := v.(UnsignedValue) - tsEnc.Write(vv.unixnano) - vEnc.Write(int64(vv.value)) - } + b, err := encodeUnsignedBlockUsing(buf, values, tenc, venc) - // Encoded timestamp values - tb, err := tsEnc.Bytes() - if err != nil { - return err - } - // Encoded int64 values - vb, err := vEnc.Bytes() - if err != nil { - return err - } - - // Prepend the first timestamp of the block in the first 8 bytes - b = packBlock(buf, BlockUnsigned, tb, vb) - return nil - }() - - putTimeEncoder(tsEnc) - putUnsignedEncoder(vEnc) + putTimeEncoder(tenc) + putUnsignedEncoder(venc) return b, err } +func encodeUnsignedBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc IntegerEncoder) ([]byte, error) { + tenc.Reset() + venc.Reset() + + for _, v := range values { + vv := v.(UnsignedValue) + tenc.Write(vv.unixnano) + venc.Write(int64(vv.value)) + } + + // Encoded timestamp values + tb, err := tenc.Bytes() + if err != nil { + return nil, err + } + // Encoded int64 values + vb, err := venc.Bytes() + if err != nil { + return nil, err + } + + // Prepend the first timestamp of the block in the first 8 bytes + return packBlock(buf, BlockUnsigned, tb, vb), nil +} + // DecodeUnsignedBlock decodes the unsigned integer block from the byte slice // and appends the unsigned integer values to a. func DecodeUnsignedBlock(block []byte, a *[]UnsignedValue) ([]UnsignedValue, error) { @@ -874,40 +885,42 @@ func (v StringValue) String() string { } func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { - tsEnc := getTimeEncoder(len(values)) - vEnc := getStringEncoder(len(values) * len(values[0].(StringValue).value)) + tenc := getTimeEncoder(len(values)) + venc := getStringEncoder(len(values) * len(values[0].(StringValue).value)) - var b []byte - err := func() error { - for _, v := range values { - vv := v.(StringValue) - tsEnc.Write(vv.unixnano) - vEnc.Write(vv.value) - } + b, err := encodeStringBlockUsing(buf, values, tenc, venc) - // Encoded timestamp values - tb, err := tsEnc.Bytes() - if err != nil { - return err - } - // Encoded string values - vb, err := vEnc.Bytes() - if err != nil { - return err - } - - // Prepend the first timestamp of the block in the first 8 bytes - b = packBlock(buf, BlockString, tb, vb) - - return nil - }() - - putTimeEncoder(tsEnc) - putStringEncoder(vEnc) + putTimeEncoder(tenc) + putStringEncoder(venc) return b, err } +func encodeStringBlockUsing(buf []byte, values []Value, tenc TimeEncoder, venc StringEncoder) ([]byte, error) { + tenc.Reset() + venc.Reset() + + for _, v := range values { + vv := v.(StringValue) + tenc.Write(vv.unixnano) + venc.Write(vv.value) + } + + // Encoded timestamp values + tb, err := tenc.Bytes() + if err != nil { + return nil, err + } + // Encoded string values + vb, err := venc.Bytes() + if err != nil { + return nil, err + } + + // Prepend the first timestamp of the block in the first 8 bytes + return packBlock(buf, BlockString, tb, vb), nil +} + // DecodeStringBlock decodes the string block from the byte slice // and appends the string values to a. func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) { From 61ca1243c7ba61c9a475de7245973e4e6e10b57c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 20 Sep 2017 09:05:30 -0600 Subject: [PATCH 11/14] Increase index disk writer buffer --- tsdb/engine/tsm1/writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index e3a847c124..5c456333da 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -237,7 +237,7 @@ func NewIndexWriter() IndexWriter { // NewIndexWriter returns a new IndexWriter. func NewDiskIndexWriter(f *os.File) IndexWriter { - return &directIndex{fd: f, w: bufio.NewWriter(f)} + return &directIndex{fd: f, w: bufio.NewWriterSize(f, 1024*1024)} } // indexBlock represent an index information for a series within a TSM file. From deef0c56496ba76e0e829c811dfd9100ba173633 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 20 Sep 2017 10:00:20 -0600 Subject: [PATCH 12/14] Fix 32bit alignment --- tsdb/engine/tsm1/ring.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index b1afec6355..1eeeaf4ffe 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -32,15 +32,15 @@ const partitions = 4096 // key is hashed and the first 8 bits are used as an index to the ring. // type ring struct { - // The unique set of partitions in the ring. - // len(partitions) <= len(continuum) - partitions []*partition - // Number of keys within the ring. This is used to provide a hint for // allocating the return values in keys(). It will not be perfectly accurate // since it doesn't consider adding duplicate keys, or trying to remove non- // existent keys. keysHint int64 + + // The unique set of partitions in the ring. + // len(partitions) <= len(continuum) + partitions []*partition } // newring returns a new ring initialised with n partitions. n must always be a From db204f3eb7b96ac3bb643b0e6decca125437ce84 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 20 Sep 2017 15:27:34 -0600 Subject: [PATCH 13/14] Default concurrent compactions to 50% of available cores --- etc/config.sample.toml | 9 +++-- pkg/limiter/fixed.go | 17 ++++++++ tsdb/config.go | 2 +- tsdb/engine.go | 12 +++--- tsdb/engine/tsm1/compact.go | 2 +- tsdb/engine/tsm1/engine.go | 79 +++++++++++++++++++++---------------- tsdb/store.go | 26 ++++++++++-- 7 files changed, 100 insertions(+), 47 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index f27fb800ab..b613ad3a24 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -88,7 +88,8 @@ # compact-full-write-cold-duration = "4h" # The maximum number of concurrent full and level compactions that can run at one time. A - # value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply + # value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater + # than 0 limits compactions to that value. This setting does not apply # to cache snapshotting. # max-concurrent-compactions = 0 @@ -358,10 +359,10 @@ # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max. # read-buffer = 0 - # Multi-value plugins can be handled two ways. + # Multi-value plugins can be handled two ways. # "split" will parse and store the multi-value plugin data into separate measurements - # "join" will parse and store the multi-value plugin as a single multi-value measurement. - # "split" is the default behavior for backward compatability with previous versions of influxdb. + # "join" will parse and store the multi-value plugin as a single multi-value measurement. + # "split" is the default behavior for backward compatability with previous versions of influxdb. # parse-multivalue-plugin = "split" ### ### [opentsdb] diff --git a/pkg/limiter/fixed.go b/pkg/limiter/fixed.go index f7e35f9442..85815d81dc 100644 --- a/pkg/limiter/fixed.go +++ b/pkg/limiter/fixed.go @@ -10,10 +10,27 @@ func NewFixed(limit int) Fixed { return make(Fixed, limit) } +// Idle returns true if the limiter has all its capacity is available. +func (t Fixed) Idle() bool { + return len(t) == cap(t) +} + +// TryTake attempts to take a token and return true if successful, otherwise returns false. +func (t Fixed) TryTake() bool { + select { + case t <- struct{}{}: + return true + default: + return false + } +} + +// Take attempts to take a token and blocks until one is available. func (t Fixed) Take() { t <- struct{}{} } +// Release releases a token back to the limiter. func (t Fixed) Release() { <-t } diff --git a/tsdb/config.go b/tsdb/config.go index d1596bb8a3..d92a1e4723 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -49,7 +49,7 @@ const ( DefaultMaxValuesPerTag = 100000 // DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions - // that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime. + // that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. DefaultMaxConcurrentCompactions = 0 ) diff --git a/tsdb/engine.go b/tsdb/engine.go index 2a8179db28..70b175e029 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -144,11 +144,13 @@ func NewEngine(id uint64, i Index, database, path string, walPath string, option // EngineOptions represents the options used to initialize the engine. type EngineOptions struct { - EngineVersion string - IndexVersion string - ShardID uint64 - InmemIndex interface{} // shared in-memory index - CompactionLimiter limiter.Fixed + EngineVersion string + IndexVersion string + ShardID uint64 + InmemIndex interface{} // shared in-memory index + + HiPriCompactionLimiter limiter.Fixed + LoPriCompactionLimiter limiter.Fixed Config Config } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index c378fe2a8f..70d153b7d0 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -230,7 +230,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // Each compaction group should run against 4 generations. For level 1, since these // can get created much more quickly, bump the grouping to 8 to keep file counts lower. groupSize := 4 - if level == 1 { + if level == 1 || level == 3 { groupSize = 8 } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 375fd9013b..23f9a0ce73 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -137,8 +137,10 @@ type Engine struct { stats *EngineStatistics - // The limiter for concurrent compactions - compactionLimiter limiter.Fixed + // Limiters for concurrent compactions. The low priority limiter is for level 3 and 4 + // compactions. The high priority is for level 1 and 2 compactions. + loPriCompactionLimiter limiter.Fixed + hiPriCompactionLimiter limiter.Fixed } // NewEngine returns a new instance of Engine. @@ -176,8 +178,9 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), enableCompactionsOnOpen: true, - stats: &EngineStatistics{}, - compactionLimiter: opt.CompactionLimiter, + stats: &EngineStatistics{}, + loPriCompactionLimiter: opt.LoPriCompactionLimiter, + hiPriCompactionLimiter: opt.HiPriCompactionLimiter, } // Attach fieldset to index. @@ -1346,46 +1349,33 @@ func (e *Engine) onFileStoreReplace(newFiles []TSMFile) { type compactionStrategy struct { compactionGroups []CompactionGroup - // concurrency determines how many compactions groups will be started - // concurrently. These groups may be limited by the global limiter if - // enabled. - concurrency int fast bool description string + level int durationStat *int64 activeStat *int64 successStat *int64 errorStat *int64 - logger zap.Logger - compactor *Compactor - fileStore *FileStore - limiter limiter.Fixed - engine *Engine + logger zap.Logger + compactor *Compactor + fileStore *FileStore + loPriLimiter limiter.Fixed + hiPriLimiter limiter.Fixed + + engine *Engine } // Apply concurrently compacts all the groups in a compaction strategy. func (s *compactionStrategy) Apply() { start := time.Now() - // cap concurrent compaction groups to no more than 4 at a time. - concurrency := s.concurrency - if concurrency == 0 { - concurrency = 4 - } - - throttle := limiter.NewFixed(concurrency) var wg sync.WaitGroup for i := range s.compactionGroups { wg.Add(1) go func(groupNum int) { defer wg.Done() - - // limit concurrent compaction groups - throttle.Take() - defer throttle.Release() - s.compactGroup(groupNum) }(i) } @@ -1396,10 +1386,31 @@ func (s *compactionStrategy) Apply() { // compactGroup executes the compaction strategy against a single CompactionGroup. func (s *compactionStrategy) compactGroup(groupNum int) { - // Limit concurrent compactions if we have a limiter - if cap(s.limiter) > 0 { - s.limiter.Take() - defer s.limiter.Release() + // Level 1 and 2 are high priority and have a larger slice of the pool. If all + // the high priority capacity is used up, they can steal from the low priority + // pool as well if there is capacity. Otherwise, it wait on the high priority + // limiter until an running compaction completes. Level 3 and 4 are low priority + // as they are generally larger compactions and more expensive to run. They can + // steal a little from the high priority limiter if there is no high priority work. + switch s.level { + case 1, 2: + if s.hiPriLimiter.TryTake() { + defer s.hiPriLimiter.Release() + } else if s.loPriLimiter.TryTake() { + defer s.loPriLimiter.Release() + } else { + s.hiPriLimiter.Take() + defer s.hiPriLimiter.Release() + } + default: + if s.loPriLimiter.TryTake() { + defer s.loPriLimiter.Release() + } else if s.hiPriLimiter.Idle() && s.hiPriLimiter.TryTake() { + defer s.hiPriLimiter.Release() + } else { + s.loPriLimiter.Take() + defer s.loPriLimiter.Release() + } } group := s.compactionGroups[groupNum] @@ -1462,14 +1473,15 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate } return &compactionStrategy{ - concurrency: 4, compactionGroups: compactionGroups, logger: e.logger, fileStore: e.FileStore, compactor: e.Compactor, fast: fast, - limiter: e.compactionLimiter, + loPriLimiter: e.loPriCompactionLimiter, + hiPriLimiter: e.hiPriCompactionLimiter, engine: e, + level: level, description: fmt.Sprintf("level %d", level), activeStat: &e.stats.TSMCompactionsActive[level-1], @@ -1495,14 +1507,15 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy { } s := &compactionStrategy{ - concurrency: 1, compactionGroups: compactionGroups, logger: e.logger, fileStore: e.FileStore, compactor: e.Compactor, fast: optimize, - limiter: e.compactionLimiter, + loPriLimiter: e.loPriCompactionLimiter, + hiPriLimiter: e.hiPriCompactionLimiter, engine: e, + level: 4, } if optimize { diff --git a/tsdb/store.go b/tsdb/store.go index 62bee6942e..707281cce6 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -159,15 +159,35 @@ func (s *Store) loadShards() error { err error } - t := limiter.NewFixed(runtime.GOMAXPROCS(0)) - // Setup a shared limiter for compactions lim := s.EngineOptions.Config.MaxConcurrentCompactions if lim == 0 { + lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions + if lim < 1 { + lim = 1 + } + } + + // Don't allow more compactions to run than cores. + if lim > runtime.GOMAXPROCS(0) { lim = runtime.GOMAXPROCS(0) } - s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) + // If only one compacttion can run at time, use the same limiter for high and low + // priority work. + if lim == 1 { + s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(1) + s.EngineOptions.LoPriCompactionLimiter = s.EngineOptions.HiPriCompactionLimiter + } else { + // Split the available high and low priority limiters between the available cores. + // The high priority work can steal from low priority at times so it can use the + // full limit if there is pending work. The low priority is capped at half the + // limit. + s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(lim/2 + lim%2) + s.EngineOptions.LoPriCompactionLimiter = limiter.NewFixed(lim / 2) + } + + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res) var n int From 94aba64b8813be37b3c56cc35b9e4294b829a9de Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 20 Sep 2017 17:27:58 -0600 Subject: [PATCH 14/14] Re-use index entries slice when writing TSM index --- tsdb/engine/tsm1/reader_test.go | 18 +-- tsdb/engine/tsm1/writer.go | 193 +++++++++++++++----------------- 2 files changed, 95 insertions(+), 116 deletions(-) diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index b3336fd03f..de08c80bf3 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -881,9 +881,10 @@ func TestIndirectIndex_Entries(t *testing.T) { index := NewIndexWriter() index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 100) index.Add([]byte("cpu"), BlockFloat64, 2, 3, 20, 200) - index.Add([]byte("mem"), BlockFloat64, 0, 1, 10, 100) exp := index.Entries([]byte("cpu")) + index.Add([]byte("mem"), BlockFloat64, 0, 1, 10, 100) + b, err := index.MarshalBinary() if err != nil { t.Fatalf("unexpected error marshaling index: %v", err) @@ -981,27 +982,16 @@ func TestIndirectIndex_Type(t *testing.T) { } } -func TestIndirectIndex_Keys(t *testing.T) { +func TestDirectIndex_KeyCount(t *testing.T) { index := NewIndexWriter() index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20) index.Add([]byte("cpu"), BlockFloat64, 1, 2, 20, 30) index.Add([]byte("mem"), BlockFloat64, 0, 1, 10, 20) - keys := index.Keys() - // 2 distinct keys - if got, exp := len(keys), 2; got != exp { + if got, exp := index.KeyCount(), 2; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } - - // Keys should be sorted - if got, exp := string(keys[0]), "cpu"; got != exp { - t.Fatalf("key mismatch: got %v, exp %v", got, exp) - } - - if got, exp := string(keys[1]), "mem"; got != exp { - t.Fatalf("key mismatch: got %v, exp %v", got, exp) - } } func TestBlockIterator_Single(t *testing.T) { diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index 5c456333da..bc83605429 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -152,9 +152,6 @@ type IndexWriter interface { // Entries returns all index entries for a key. Entries(key []byte) []IndexEntry - // Keys returns the unique set of keys in the index. - Keys() [][]byte - // KeyCount returns the count of unique keys in the index. KeyCount() int @@ -232,7 +229,8 @@ func (e *IndexEntry) String() string { // NewIndexWriter returns a new IndexWriter. func NewIndexWriter() IndexWriter { - return &directIndex{} + buf := bytes.NewBuffer(make([]byte, 0, 4096)) + return &directIndex{buf: buf, w: bufio.NewWriter(buf)} } // NewIndexWriter returns a new IndexWriter. @@ -249,43 +247,48 @@ type indexBlock struct { // directIndex is a simple in-memory index implementation for a TSM file. The full index // must fit in memory. type directIndex struct { - size uint32 - blocks []indexBlock - fd *os.File - w *bufio.Writer + keyCount int + size uint32 + fd *os.File + buf *bytes.Buffer + + w *bufio.Writer + + key []byte + indexEntries *indexEntries } func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32) { // Is this the first block being added? - if len(d.blocks) == 0 { + if len(d.key) == 0 { // size of the key stored in the index d.size += uint32(2 + len(key)) // size of the count of entries stored in the index d.size += indexCountSize - d.blocks = append(d.blocks, indexBlock{ - key: key, - entries: &indexEntries{ - Type: blockType, - entries: []IndexEntry{IndexEntry{ - MinTime: minTime, - MaxTime: maxTime, - Offset: offset, - Size: size, - }}}, + d.key = key + if d.indexEntries == nil { + d.indexEntries = &indexEntries{} + } + d.indexEntries.Type = blockType + d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{ + MinTime: minTime, + MaxTime: maxTime, + Offset: offset, + Size: size, }) // size of the encoded index entry d.size += indexEntrySize + d.keyCount++ return } - // Find the last block so we can see if were still adding to the same series key. - block := d.blocks[len(d.blocks)-1] - cmp := bytes.Compare(block.key, key) + // See if were still adding to the same series key. + cmp := bytes.Compare(d.key, key) if cmp == 0 { // The last block is still this key - block.entries.entries = append(block.entries.entries, IndexEntry{ + d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{ MinTime: minTime, MaxTime: maxTime, Offset: offset, @@ -296,9 +299,7 @@ func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, of d.size += indexEntrySize } else if cmp < 0 { - if d.w != nil { - d.flush(d.w) - } + d.flush(d.w) // We have a new key that is greater than the last one so we need to add // a new index block section. @@ -307,38 +308,31 @@ func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, of // size of the count of entries stored in the index d.size += indexCountSize - d.blocks = append(d.blocks, indexBlock{ - key: key, - entries: &indexEntries{ - Type: blockType, - entries: []IndexEntry{IndexEntry{ - MinTime: minTime, - MaxTime: maxTime, - Offset: offset, - Size: size, - }}}, + d.key = key + d.indexEntries.Type = blockType + d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{ + MinTime: minTime, + MaxTime: maxTime, + Offset: offset, + Size: size, }) // size of the encoded index entry d.size += indexEntrySize + d.keyCount++ } else { // Keys can't be added out of order. - panic(fmt.Sprintf("keys must be added in sorted order: %s < %s", string(key), string(d.blocks[len(d.blocks)-1].key))) + panic(fmt.Sprintf("keys must be added in sorted order: %s < %s", string(key), string(d.key))) } } func (d *directIndex) entries(key []byte) []IndexEntry { - if len(d.blocks) == 0 { + if len(d.key) == 0 { return nil } - if bytes.Equal(d.blocks[len(d.blocks)-1].key, key) { - return d.blocks[len(d.blocks)-1].entries.entries - } - - i := sort.Search(len(d.blocks), func(i int) bool { return bytes.Compare(d.blocks[i].key, key) >= 0 }) - if i < len(d.blocks) && bytes.Equal(d.blocks[i].key, key) { - return d.blocks[i].entries.entries + if bytes.Equal(d.key, key) { + return d.indexEntries.entries } return nil @@ -358,23 +352,11 @@ func (d *directIndex) Entry(key []byte, t int64) *IndexEntry { return nil } -func (d *directIndex) Keys() [][]byte { - keys := make([][]byte, 0, len(d.blocks)) - for _, v := range d.blocks { - keys = append(keys, v.key) - } - return keys -} - func (d *directIndex) KeyCount() int { - return len(d.blocks) + return d.keyCount } func (d *directIndex) WriteTo(w io.Writer) (int64, error) { - if d.w == nil { - return d.flush(w) - } - if _, err := d.flush(d.w); err != nil { return 0, err } @@ -383,6 +365,10 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { return 0, err } + if d.fd == nil { + return io.Copy(w, d.buf) + } + if _, err := d.fd.Seek(0, io.SeekStart); err != nil { return 0, err } @@ -398,50 +384,52 @@ func (d *directIndex) flush(w io.Writer) (int64, error) { N int64 ) + if len(d.key) == 0 { + return 0, nil + } // For each key, individual entries are sorted by time - for _, ie := range d.blocks { - key := ie.key - entries := ie.entries - - if entries.Len() > maxIndexEntries { - return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries) - } - - if !sort.IsSorted(entries) { - sort.Sort(entries) - } - - binary.BigEndian.PutUint16(buf[0:2], uint16(len(key))) - buf[2] = entries.Type - binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len())) - - // Append the key length and key - if n, err = w.Write(buf[0:2]); err != nil { - return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err) - } - N += int64(n) - - if n, err = w.Write(key); err != nil { - return int64(n) + N, fmt.Errorf("write: writer key error: %v", err) - } - N += int64(n) - - // Append the block type and count - if n, err = w.Write(buf[2:5]); err != nil { - return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err) - } - N += int64(n) - - // Append each index entry for all blocks for this key - var n64 int64 - if n64, err = entries.WriteTo(w); err != nil { - return n64 + N, fmt.Errorf("write: writer entries error: %v", err) - } - N += n64 + key := d.key + entries := d.indexEntries + if entries.Len() > maxIndexEntries { + return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries) } - d.blocks = d.blocks[:0] + if !sort.IsSorted(entries) { + sort.Sort(entries) + } + + binary.BigEndian.PutUint16(buf[0:2], uint16(len(key))) + buf[2] = entries.Type + binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len())) + + // Append the key length and key + if n, err = w.Write(buf[0:2]); err != nil { + return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err) + } + N += int64(n) + + if n, err = w.Write(key); err != nil { + return int64(n) + N, fmt.Errorf("write: writer key error: %v", err) + } + N += int64(n) + + // Append the block type and count + if n, err = w.Write(buf[2:5]); err != nil { + return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err) + } + N += int64(n) + + // Append each index entry for all blocks for this key + var n64 int64 + if n64, err = entries.WriteTo(w); err != nil { + return n64 + N, fmt.Errorf("write: writer entries error: %v", err) + } + N += n64 + + d.key = nil + d.indexEntries.Type = 0 + d.indexEntries.entries = d.indexEntries.entries[:0] return N, nil @@ -460,14 +448,15 @@ func (d *directIndex) Size() uint32 { } func (d *directIndex) Close() error { - if d.w == nil { - return nil - } - // Flush anything remaining in the index if err := d.w.Flush(); err != nil { return err } + + if d.fd == nil { + return nil + } + if err := d.fd.Close(); err != nil { return nil }