From 9ee305f6f58255f5b59c3effbd65e480086d436d Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 14 Sep 2017 18:18:05 -0600 Subject: [PATCH] Periodically re-allocate cache store This perioically re-allocates the cache store to avoid memory fragmentation and gradual slow down of the store after repeated deletes and inserts into the map. --- tsdb/engine/tsm1/ring.go | 55 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index 244dd11ab3..b1afec6355 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -129,6 +129,14 @@ func (r *ring) keys(sorted bool) [][]byte { return keys } +func (r *ring) count() int { + var n int + for _, p := range r.partitions { + n += p.count() + } + return n +} + // apply applies the provided function to every entry in the ring under a read // lock using a separate goroutine for each partition. The provided function // will be called with each key and the corresponding entry. The first error @@ -180,6 +188,9 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error { for _, p := range r.partitions { p.mu.RLock() for k, e := range p.store { + if e.count() == 0 { + continue + } if err := f([]byte(k), e); err != nil { p.mu.RUnlock() return err @@ -190,6 +201,21 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error { return nil } +func (r *ring) split(n int) []storer { + var keys int + storers := make([]storer, n) + for i := 0; i < n; i++ { + storers[i], _ = newring(len(r.partitions)) + } + + for i, p := range r.partitions { + r := storers[i%n].(*ring) + r.partitions[i] = p + keys += len(p.store) + } + return storers +} + // partition provides safe access to a map of series keys to entries. type partition struct { mu sync.RWMutex @@ -254,7 +280,10 @@ func (p *partition) remove(key []byte) { func (p *partition) keys() [][]byte { p.mu.RLock() keys := make([][]byte, 0, len(p.store)) - for k := range p.store { + for k, v := range p.store { + if v.count() == 0 { + continue + } keys = append(keys, []byte(k)) } p.mu.RUnlock() @@ -264,9 +293,25 @@ func (p *partition) keys() [][]byte { // reset resets the partition by reinitialising the store. reset returns hints // about sizes that the entries within the store could be reallocated with. func (p *partition) reset() { + p.mu.RLock() + sz := len(p.store) + p.mu.RUnlock() + + newStore := make(map[string]*entry, sz) p.mu.Lock() - defer p.mu.Unlock() - for k := range p.store { - delete(p.store, k) - } + p.store = newStore + p.mu.Unlock() +} + +func (p *partition) count() int { + var n int + p.mu.RLock() + for _, v := range p.store { + if v.count() > 0 { + n++ + } + } + p.mu.RUnlock() + return n + }