Stop background compaction goroutines when shard is cold

Each shard has a number of goroutines for compacting different levels
of TSM files.  When a shard goes cold and is fully compacted, these
goroutines are still running.

This change will stop background shard goroutines when the shard goes
cold and start them back up if new writes arrive.
pull/8348/head
Jason Wilder 2017-05-02 09:20:01 -06:00
parent 3d1c0cd981
commit f87fd7c7ed
8 changed files with 98 additions and 14 deletions

View File

@ -31,6 +31,8 @@ type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
WithLogger(zap.Logger)
LoadMetadataIndex(shardID uint64, index Index) error
@ -72,6 +74,7 @@ type Engine interface {
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
IsIdle() bool
io.WriterTo
}

View File

@ -54,6 +54,7 @@ type CompactionPlanner interface {
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Release(group []CompactionGroup)
FullyCompacted() bool
}
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
@ -144,6 +145,12 @@ func (t *tsmGeneration) hasTombstones() bool {
return false
}
// FullyCompacted returns true if the shard is fully compacted.
func (c *DefaultPlanner) FullyCompacted() bool {
gens := c.findGenerations()
return len(gens) <= 1 && !gens.hasTombstones()
}
// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// Determine the generations from all files on disk. We need to treat

View File

@ -528,6 +528,21 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return nil
}
// IsIdle returns true if the cache is empty, there are no running compactions and the
// shard is fully compacted.
func (e *Engine) IsIdle() bool {
cacheEmpty := e.Cache.Size() == 0
runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive)
return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted()
}
// Backup writes a tar archive of any TSM files modified since the passed
// in time to the passed in writer. The basePath will be prepended to the names
// of the files in the archive. It will force a snapshot of the WAL first

View File

@ -1060,6 +1060,7 @@ func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return
func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) FullyCompacted() bool { return false }
// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
func ParseTags(s string) influxql.Tags {

View File

@ -317,13 +317,17 @@ func (f *FileStore) Delete(keys []string) error {
// DeleteRange removes the values for keys between timestamps min and max.
func (f *FileStore) DeleteRange(keys []string, min, max int64) error {
if err := f.walkFiles(func(tsm TSMFile) error {
return tsm.DeleteRange(keys, min, max)
}); err != nil {
return err
}
f.mu.Lock()
f.lastModified = time.Now().UTC()
f.lastFileStats = nil
f.mu.Unlock()
return f.walkFiles(func(tsm TSMFile) error {
return tsm.DeleteRange(keys, min, max)
})
return nil
}
// Open loads all the TSM files in the configured directory.
@ -382,15 +386,6 @@ func (f *FileStore) Open() error {
return fmt.Errorf("error opening file %s: %v", fn, err)
}
// Accumulate file store size stat
fi, err := file.Stat()
if err == nil {
atomic.AddInt64(&f.stats.DiskBytes, fi.Size())
if fi.ModTime().UTC().After(f.lastModified) {
f.lastModified = fi.ModTime().UTC()
}
}
go func(idx int, file *os.File) {
start := time.Now()
df, err := NewTSMReader(file)
@ -404,6 +399,7 @@ func (f *FileStore) Open() error {
}(i, file)
}
var lm int64
for range files {
res := <-readerC
if res.err != nil {
@ -411,7 +407,16 @@ func (f *FileStore) Open() error {
return res.err
}
f.files = append(f.files, res.r)
// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
// Re-initialize the lastModified time for the file store
if res.r.LastModified() > lm {
lm = res.r.LastModified()
}
}
f.lastModified = time.Unix(0, lm)
close(readerC)
sort.Sort(tsmReaders(f.files))

View File

@ -465,6 +465,11 @@ func (t *TSMReader) Size() uint32 {
func (t *TSMReader) LastModified() int64 {
t.mu.RLock()
lm := t.lastModified
for _, ts := range t.tombstoner.TombstoneFiles() {
if ts.LastModified > lm {
lm = ts.LastModified
}
}
t.mu.RUnlock()
return lm
}

View File

@ -110,6 +110,7 @@ type Shard struct {
path string
walPath string
id uint64
wg sync.WaitGroup
database string
retentionPolicy string
@ -288,6 +289,7 @@ func (s *Shard) Open() error {
}
s.engine = e
s.wg.Add(1)
go s.monitor()
return nil
@ -335,6 +337,7 @@ func (s *Shard) close(clean bool) error {
default:
close(s.closing)
}
s.wg.Wait()
if clean {
// Don't leak our shard ID and series keys in the index
@ -380,6 +383,23 @@ func (s *Shard) UnloadIndex() {
s.index.RemoveShard(s.id)
}
// IsIdle return true if the shard is not receiving writes and is fully compacted.
func (s *Shard) IsIdle() bool {
if err := s.ready(); err != nil {
return true
}
return s.engine.IsIdle()
}
// SetCompactionsEnabled enables or disable shard background compactions.
func (s *Shard) SetCompactionsEnabled(enabled bool) {
if err := s.ready(); err != nil {
return
}
s.engine.SetCompactionsEnabled(enabled)
}
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
var size int64
@ -965,6 +985,8 @@ func (s *Shard) CreateSnapshot() (string, error) {
}
func (s *Shard) monitor() {
defer s.wg.Done()
t := time.NewTicker(monitorStatInterval)
defer t.Stop()
t2 := time.NewTicker(time.Minute)
@ -976,7 +998,6 @@ func (s *Shard) monitor() {
case <-s.closing:
return
case <-t.C:
// Checking DiskSize can be expensive with a lot of shards and TSM files, only
// check if something has changed.
lm := s.LastModified()

View File

@ -145,6 +145,8 @@ func (s *Store) Open() error {
}
s.opened = true
s.wg.Add(1)
go s.monitorShards()
return nil
}
@ -265,6 +267,9 @@ func (s *Store) loadShards() error {
// Enable all shards
for _, sh := range s.shards {
sh.SetEnabled(true)
if sh.IsIdle() {
sh.SetCompactionsEnabled(false)
}
}
return nil
@ -1046,6 +1051,28 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
return tagValues, nil
}
func (s *Store) monitorShards() {
defer s.wg.Done()
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-s.closing:
return
case <-t.C:
s.mu.RLock()
for _, sh := range s.shards {
if sh.IsIdle() {
sh.SetCompactionsEnabled(false)
} else {
sh.SetCompactionsEnabled(true)
}
}
s.mu.RUnlock()
}
}
}
// KeyValue holds a string key and a string value.
type KeyValue struct {
Key, Value string