diff --git a/tsdb/engine.go b/tsdb/engine.go index d885dbc0db..71bff1e603 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -31,6 +31,8 @@ type Engine interface { Open() error Close() error SetEnabled(enabled bool) + SetCompactionsEnabled(enabled bool) + WithLogger(zap.Logger) LoadMetadataIndex(shardID uint64, index Index) error @@ -72,6 +74,7 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time + IsIdle() bool io.WriterTo } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 4ca1273633..7aba38a7af 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -54,6 +54,7 @@ type CompactionPlanner interface { PlanLevel(level int) []CompactionGroup PlanOptimize() []CompactionGroup Release(group []CompactionGroup) + FullyCompacted() bool } // DefaultPlanner implements CompactionPlanner using a strategy to roll up @@ -144,6 +145,12 @@ func (t *tsmGeneration) hasTombstones() bool { return false } +// FullyCompacted returns true if the shard is fully compacted. +func (c *DefaultPlanner) FullyCompacted() bool { + gens := c.findGenerations() + return len(gens) <= 1 && !gens.hasTombstones() +} + // PlanLevel returns a set of TSM files to rewrite for a specific level. func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // Determine the generations from all files on disk. We need to treat diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c80a69ec02..3a5a98a93e 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -528,6 +528,21 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { return nil } +// IsIdle returns true if the cache is empty, there are no running compactions and the +// shard is fully compacted. +func (e *Engine) IsIdle() bool { + cacheEmpty := e.Cache.Size() == 0 + + runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive) + + return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted() +} + // Backup writes a tar archive of any TSM files modified since the passed // in time to the passed in writer. The basePath will be prepended to the names // of the files in the archive. It will force a snapshot of the WAL first diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index b2f941ba30..8254a37ac3 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1060,6 +1060,7 @@ func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} +func (m *mockPlanner) FullyCompacted() bool { return false } // ParseTags returns an instance of Tags for a comma-delimited list of key/values. func ParseTags(s string) influxql.Tags { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index decb843083..34a3569d86 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -317,13 +317,17 @@ func (f *FileStore) Delete(keys []string) error { // DeleteRange removes the values for keys between timestamps min and max. func (f *FileStore) DeleteRange(keys []string, min, max int64) error { + if err := f.walkFiles(func(tsm TSMFile) error { + return tsm.DeleteRange(keys, min, max) + }); err != nil { + return err + } + f.mu.Lock() f.lastModified = time.Now().UTC() + f.lastFileStats = nil f.mu.Unlock() - - return f.walkFiles(func(tsm TSMFile) error { - return tsm.DeleteRange(keys, min, max) - }) + return nil } // Open loads all the TSM files in the configured directory. @@ -382,15 +386,6 @@ func (f *FileStore) Open() error { return fmt.Errorf("error opening file %s: %v", fn, err) } - // Accumulate file store size stat - fi, err := file.Stat() - if err == nil { - atomic.AddInt64(&f.stats.DiskBytes, fi.Size()) - if fi.ModTime().UTC().After(f.lastModified) { - f.lastModified = fi.ModTime().UTC() - } - } - go func(idx int, file *os.File) { start := time.Now() df, err := NewTSMReader(file) @@ -404,6 +399,7 @@ func (f *FileStore) Open() error { }(i, file) } + var lm int64 for range files { res := <-readerC if res.err != nil { @@ -411,7 +407,16 @@ func (f *FileStore) Open() error { return res.err } f.files = append(f.files, res.r) + // Accumulate file store size stats + atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + + // Re-initialize the lastModified time for the file store + if res.r.LastModified() > lm { + lm = res.r.LastModified() + } + } + f.lastModified = time.Unix(0, lm) close(readerC) sort.Sort(tsmReaders(f.files)) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 0c9d277914..7e5abf3b99 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -465,6 +465,11 @@ func (t *TSMReader) Size() uint32 { func (t *TSMReader) LastModified() int64 { t.mu.RLock() lm := t.lastModified + for _, ts := range t.tombstoner.TombstoneFiles() { + if ts.LastModified > lm { + lm = ts.LastModified + } + } t.mu.RUnlock() return lm } diff --git a/tsdb/shard.go b/tsdb/shard.go index 680245f997..c03030093e 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -110,6 +110,7 @@ type Shard struct { path string walPath string id uint64 + wg sync.WaitGroup database string retentionPolicy string @@ -288,6 +289,7 @@ func (s *Shard) Open() error { } s.engine = e + s.wg.Add(1) go s.monitor() return nil @@ -335,6 +337,7 @@ func (s *Shard) close(clean bool) error { default: close(s.closing) } + s.wg.Wait() if clean { // Don't leak our shard ID and series keys in the index @@ -380,6 +383,23 @@ func (s *Shard) UnloadIndex() { s.index.RemoveShard(s.id) } +// IsIdle return true if the shard is not receiving writes and is fully compacted. +func (s *Shard) IsIdle() bool { + if err := s.ready(); err != nil { + return true + } + + return s.engine.IsIdle() +} + +// SetCompactionsEnabled enables or disable shard background compactions. +func (s *Shard) SetCompactionsEnabled(enabled bool) { + if err := s.ready(); err != nil { + return + } + s.engine.SetCompactionsEnabled(enabled) +} + // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { var size int64 @@ -965,6 +985,8 @@ func (s *Shard) CreateSnapshot() (string, error) { } func (s *Shard) monitor() { + defer s.wg.Done() + t := time.NewTicker(monitorStatInterval) defer t.Stop() t2 := time.NewTicker(time.Minute) @@ -976,7 +998,6 @@ func (s *Shard) monitor() { case <-s.closing: return case <-t.C: - // Checking DiskSize can be expensive with a lot of shards and TSM files, only // check if something has changed. lm := s.LastModified() diff --git a/tsdb/store.go b/tsdb/store.go index 4a01776694..3abe51f3f9 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -145,6 +145,8 @@ func (s *Store) Open() error { } s.opened = true + s.wg.Add(1) + go s.monitorShards() return nil } @@ -265,6 +267,9 @@ func (s *Store) loadShards() error { // Enable all shards for _, sh := range s.shards { sh.SetEnabled(true) + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } } return nil @@ -1046,6 +1051,28 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err return tagValues, nil } +func (s *Store) monitorShards() { + defer s.wg.Done() + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-s.closing: + return + case <-t.C: + s.mu.RLock() + for _, sh := range s.shards { + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } else { + sh.SetCompactionsEnabled(true) + } + } + s.mu.RUnlock() + } + } +} + // KeyValue holds a string key and a string value. type KeyValue struct { Key, Value string