From f87fd7c7ed397209c855fcd977f407db85694613 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 2 May 2017 09:20:01 -0600 Subject: [PATCH] Stop background compaction goroutines when shard is cold Each shard has a number of goroutines for compacting different levels of TSM files. When a shard goes cold and is fully compacted, these goroutines are still running. This change will stop background shard goroutines when the shard goes cold and start them back up if new writes arrive. --- tsdb/engine.go | 3 +++ tsdb/engine/tsm1/compact.go | 7 +++++++ tsdb/engine/tsm1/engine.go | 15 +++++++++++++++ tsdb/engine/tsm1/engine_test.go | 1 + tsdb/engine/tsm1/file_store.go | 31 ++++++++++++++++++------------- tsdb/engine/tsm1/reader.go | 5 +++++ tsdb/shard.go | 23 ++++++++++++++++++++++- tsdb/store.go | 27 +++++++++++++++++++++++++++ 8 files changed, 98 insertions(+), 14 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index d885dbc0db..71bff1e603 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -31,6 +31,8 @@ type Engine interface { Open() error Close() error SetEnabled(enabled bool) + SetCompactionsEnabled(enabled bool) + WithLogger(zap.Logger) LoadMetadataIndex(shardID uint64, index Index) error @@ -72,6 +74,7 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time + IsIdle() bool io.WriterTo } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 4ca1273633..7aba38a7af 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -54,6 +54,7 @@ type CompactionPlanner interface { PlanLevel(level int) []CompactionGroup PlanOptimize() []CompactionGroup Release(group []CompactionGroup) + FullyCompacted() bool } // DefaultPlanner implements CompactionPlanner using a strategy to roll up @@ -144,6 +145,12 @@ func (t *tsmGeneration) hasTombstones() bool { return false } +// FullyCompacted returns true if the shard is fully compacted. +func (c *DefaultPlanner) FullyCompacted() bool { + gens := c.findGenerations() + return len(gens) <= 1 && !gens.hasTombstones() +} + // PlanLevel returns a set of TSM files to rewrite for a specific level. func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // Determine the generations from all files on disk. We need to treat diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c80a69ec02..3a5a98a93e 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -528,6 +528,21 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { return nil } +// IsIdle returns true if the cache is empty, there are no running compactions and the +// shard is fully compacted. +func (e *Engine) IsIdle() bool { + cacheEmpty := e.Cache.Size() == 0 + + runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive) + + return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted() +} + // Backup writes a tar archive of any TSM files modified since the passed // in time to the passed in writer. The basePath will be prepended to the names // of the files in the archive. It will force a snapshot of the WAL first diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index b2f941ba30..8254a37ac3 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1060,6 +1060,7 @@ func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} +func (m *mockPlanner) FullyCompacted() bool { return false } // ParseTags returns an instance of Tags for a comma-delimited list of key/values. func ParseTags(s string) influxql.Tags { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index decb843083..34a3569d86 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -317,13 +317,17 @@ func (f *FileStore) Delete(keys []string) error { // DeleteRange removes the values for keys between timestamps min and max. func (f *FileStore) DeleteRange(keys []string, min, max int64) error { + if err := f.walkFiles(func(tsm TSMFile) error { + return tsm.DeleteRange(keys, min, max) + }); err != nil { + return err + } + f.mu.Lock() f.lastModified = time.Now().UTC() + f.lastFileStats = nil f.mu.Unlock() - - return f.walkFiles(func(tsm TSMFile) error { - return tsm.DeleteRange(keys, min, max) - }) + return nil } // Open loads all the TSM files in the configured directory. @@ -382,15 +386,6 @@ func (f *FileStore) Open() error { return fmt.Errorf("error opening file %s: %v", fn, err) } - // Accumulate file store size stat - fi, err := file.Stat() - if err == nil { - atomic.AddInt64(&f.stats.DiskBytes, fi.Size()) - if fi.ModTime().UTC().After(f.lastModified) { - f.lastModified = fi.ModTime().UTC() - } - } - go func(idx int, file *os.File) { start := time.Now() df, err := NewTSMReader(file) @@ -404,6 +399,7 @@ func (f *FileStore) Open() error { }(i, file) } + var lm int64 for range files { res := <-readerC if res.err != nil { @@ -411,7 +407,16 @@ func (f *FileStore) Open() error { return res.err } f.files = append(f.files, res.r) + // Accumulate file store size stats + atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + + // Re-initialize the lastModified time for the file store + if res.r.LastModified() > lm { + lm = res.r.LastModified() + } + } + f.lastModified = time.Unix(0, lm) close(readerC) sort.Sort(tsmReaders(f.files)) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 0c9d277914..7e5abf3b99 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -465,6 +465,11 @@ func (t *TSMReader) Size() uint32 { func (t *TSMReader) LastModified() int64 { t.mu.RLock() lm := t.lastModified + for _, ts := range t.tombstoner.TombstoneFiles() { + if ts.LastModified > lm { + lm = ts.LastModified + } + } t.mu.RUnlock() return lm } diff --git a/tsdb/shard.go b/tsdb/shard.go index 680245f997..c03030093e 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -110,6 +110,7 @@ type Shard struct { path string walPath string id uint64 + wg sync.WaitGroup database string retentionPolicy string @@ -288,6 +289,7 @@ func (s *Shard) Open() error { } s.engine = e + s.wg.Add(1) go s.monitor() return nil @@ -335,6 +337,7 @@ func (s *Shard) close(clean bool) error { default: close(s.closing) } + s.wg.Wait() if clean { // Don't leak our shard ID and series keys in the index @@ -380,6 +383,23 @@ func (s *Shard) UnloadIndex() { s.index.RemoveShard(s.id) } +// IsIdle return true if the shard is not receiving writes and is fully compacted. +func (s *Shard) IsIdle() bool { + if err := s.ready(); err != nil { + return true + } + + return s.engine.IsIdle() +} + +// SetCompactionsEnabled enables or disable shard background compactions. +func (s *Shard) SetCompactionsEnabled(enabled bool) { + if err := s.ready(); err != nil { + return + } + s.engine.SetCompactionsEnabled(enabled) +} + // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { var size int64 @@ -965,6 +985,8 @@ func (s *Shard) CreateSnapshot() (string, error) { } func (s *Shard) monitor() { + defer s.wg.Done() + t := time.NewTicker(monitorStatInterval) defer t.Stop() t2 := time.NewTicker(time.Minute) @@ -976,7 +998,6 @@ func (s *Shard) monitor() { case <-s.closing: return case <-t.C: - // Checking DiskSize can be expensive with a lot of shards and TSM files, only // check if something has changed. lm := s.LastModified() diff --git a/tsdb/store.go b/tsdb/store.go index 4a01776694..3abe51f3f9 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -145,6 +145,8 @@ func (s *Store) Open() error { } s.opened = true + s.wg.Add(1) + go s.monitorShards() return nil } @@ -265,6 +267,9 @@ func (s *Store) loadShards() error { // Enable all shards for _, sh := range s.shards { sh.SetEnabled(true) + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } } return nil @@ -1046,6 +1051,28 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err return tagValues, nil } +func (s *Store) monitorShards() { + defer s.wg.Done() + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-s.closing: + return + case <-t.C: + s.mu.RLock() + for _, sh := range s.shards { + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } else { + sh.SetCompactionsEnabled(true) + } + } + s.mu.RUnlock() + } + } +} + // KeyValue holds a string key and a string value. type KeyValue struct { Key, Value string