From db204f3eb7b96ac3bb643b0e6decca125437ce84 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 20 Sep 2017 15:27:34 -0600 Subject: [PATCH] Default concurrent compactions to 50% of available cores --- etc/config.sample.toml | 9 +++-- pkg/limiter/fixed.go | 17 ++++++++ tsdb/config.go | 2 +- tsdb/engine.go | 12 +++--- tsdb/engine/tsm1/compact.go | 2 +- tsdb/engine/tsm1/engine.go | 79 +++++++++++++++++++++---------------- tsdb/store.go | 26 ++++++++++-- 7 files changed, 100 insertions(+), 47 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index f27fb800ab..b613ad3a24 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -88,7 +88,8 @@ # compact-full-write-cold-duration = "4h" # The maximum number of concurrent full and level compactions that can run at one time. A - # value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply + # value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater + # than 0 limits compactions to that value. This setting does not apply # to cache snapshotting. # max-concurrent-compactions = 0 @@ -358,10 +359,10 @@ # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max. # read-buffer = 0 - # Multi-value plugins can be handled two ways. + # Multi-value plugins can be handled two ways. # "split" will parse and store the multi-value plugin data into separate measurements - # "join" will parse and store the multi-value plugin as a single multi-value measurement. - # "split" is the default behavior for backward compatability with previous versions of influxdb. + # "join" will parse and store the multi-value plugin as a single multi-value measurement. + # "split" is the default behavior for backward compatability with previous versions of influxdb. # parse-multivalue-plugin = "split" ### ### [opentsdb] diff --git a/pkg/limiter/fixed.go b/pkg/limiter/fixed.go index f7e35f9442..85815d81dc 100644 --- a/pkg/limiter/fixed.go +++ b/pkg/limiter/fixed.go @@ -10,10 +10,27 @@ func NewFixed(limit int) Fixed { return make(Fixed, limit) } +// Idle returns true if the limiter has all its capacity is available. +func (t Fixed) Idle() bool { + return len(t) == cap(t) +} + +// TryTake attempts to take a token and return true if successful, otherwise returns false. +func (t Fixed) TryTake() bool { + select { + case t <- struct{}{}: + return true + default: + return false + } +} + +// Take attempts to take a token and blocks until one is available. func (t Fixed) Take() { t <- struct{}{} } +// Release releases a token back to the limiter. func (t Fixed) Release() { <-t } diff --git a/tsdb/config.go b/tsdb/config.go index d1596bb8a3..d92a1e4723 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -49,7 +49,7 @@ const ( DefaultMaxValuesPerTag = 100000 // DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions - // that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime. + // that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. DefaultMaxConcurrentCompactions = 0 ) diff --git a/tsdb/engine.go b/tsdb/engine.go index 2a8179db28..70b175e029 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -144,11 +144,13 @@ func NewEngine(id uint64, i Index, database, path string, walPath string, option // EngineOptions represents the options used to initialize the engine. type EngineOptions struct { - EngineVersion string - IndexVersion string - ShardID uint64 - InmemIndex interface{} // shared in-memory index - CompactionLimiter limiter.Fixed + EngineVersion string + IndexVersion string + ShardID uint64 + InmemIndex interface{} // shared in-memory index + + HiPriCompactionLimiter limiter.Fixed + LoPriCompactionLimiter limiter.Fixed Config Config } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index c378fe2a8f..70d153b7d0 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -230,7 +230,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // Each compaction group should run against 4 generations. For level 1, since these // can get created much more quickly, bump the grouping to 8 to keep file counts lower. groupSize := 4 - if level == 1 { + if level == 1 || level == 3 { groupSize = 8 } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 375fd9013b..23f9a0ce73 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -137,8 +137,10 @@ type Engine struct { stats *EngineStatistics - // The limiter for concurrent compactions - compactionLimiter limiter.Fixed + // Limiters for concurrent compactions. The low priority limiter is for level 3 and 4 + // compactions. The high priority is for level 1 and 2 compactions. + loPriCompactionLimiter limiter.Fixed + hiPriCompactionLimiter limiter.Fixed } // NewEngine returns a new instance of Engine. @@ -176,8 +178,9 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), enableCompactionsOnOpen: true, - stats: &EngineStatistics{}, - compactionLimiter: opt.CompactionLimiter, + stats: &EngineStatistics{}, + loPriCompactionLimiter: opt.LoPriCompactionLimiter, + hiPriCompactionLimiter: opt.HiPriCompactionLimiter, } // Attach fieldset to index. @@ -1346,46 +1349,33 @@ func (e *Engine) onFileStoreReplace(newFiles []TSMFile) { type compactionStrategy struct { compactionGroups []CompactionGroup - // concurrency determines how many compactions groups will be started - // concurrently. These groups may be limited by the global limiter if - // enabled. - concurrency int fast bool description string + level int durationStat *int64 activeStat *int64 successStat *int64 errorStat *int64 - logger zap.Logger - compactor *Compactor - fileStore *FileStore - limiter limiter.Fixed - engine *Engine + logger zap.Logger + compactor *Compactor + fileStore *FileStore + loPriLimiter limiter.Fixed + hiPriLimiter limiter.Fixed + + engine *Engine } // Apply concurrently compacts all the groups in a compaction strategy. func (s *compactionStrategy) Apply() { start := time.Now() - // cap concurrent compaction groups to no more than 4 at a time. - concurrency := s.concurrency - if concurrency == 0 { - concurrency = 4 - } - - throttle := limiter.NewFixed(concurrency) var wg sync.WaitGroup for i := range s.compactionGroups { wg.Add(1) go func(groupNum int) { defer wg.Done() - - // limit concurrent compaction groups - throttle.Take() - defer throttle.Release() - s.compactGroup(groupNum) }(i) } @@ -1396,10 +1386,31 @@ func (s *compactionStrategy) Apply() { // compactGroup executes the compaction strategy against a single CompactionGroup. func (s *compactionStrategy) compactGroup(groupNum int) { - // Limit concurrent compactions if we have a limiter - if cap(s.limiter) > 0 { - s.limiter.Take() - defer s.limiter.Release() + // Level 1 and 2 are high priority and have a larger slice of the pool. If all + // the high priority capacity is used up, they can steal from the low priority + // pool as well if there is capacity. Otherwise, it wait on the high priority + // limiter until an running compaction completes. Level 3 and 4 are low priority + // as they are generally larger compactions and more expensive to run. They can + // steal a little from the high priority limiter if there is no high priority work. + switch s.level { + case 1, 2: + if s.hiPriLimiter.TryTake() { + defer s.hiPriLimiter.Release() + } else if s.loPriLimiter.TryTake() { + defer s.loPriLimiter.Release() + } else { + s.hiPriLimiter.Take() + defer s.hiPriLimiter.Release() + } + default: + if s.loPriLimiter.TryTake() { + defer s.loPriLimiter.Release() + } else if s.hiPriLimiter.Idle() && s.hiPriLimiter.TryTake() { + defer s.hiPriLimiter.Release() + } else { + s.loPriLimiter.Take() + defer s.loPriLimiter.Release() + } } group := s.compactionGroups[groupNum] @@ -1462,14 +1473,15 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate } return &compactionStrategy{ - concurrency: 4, compactionGroups: compactionGroups, logger: e.logger, fileStore: e.FileStore, compactor: e.Compactor, fast: fast, - limiter: e.compactionLimiter, + loPriLimiter: e.loPriCompactionLimiter, + hiPriLimiter: e.hiPriCompactionLimiter, engine: e, + level: level, description: fmt.Sprintf("level %d", level), activeStat: &e.stats.TSMCompactionsActive[level-1], @@ -1495,14 +1507,15 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy { } s := &compactionStrategy{ - concurrency: 1, compactionGroups: compactionGroups, logger: e.logger, fileStore: e.FileStore, compactor: e.Compactor, fast: optimize, - limiter: e.compactionLimiter, + loPriLimiter: e.loPriCompactionLimiter, + hiPriLimiter: e.hiPriCompactionLimiter, engine: e, + level: 4, } if optimize { diff --git a/tsdb/store.go b/tsdb/store.go index 62bee6942e..707281cce6 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -159,15 +159,35 @@ func (s *Store) loadShards() error { err error } - t := limiter.NewFixed(runtime.GOMAXPROCS(0)) - // Setup a shared limiter for compactions lim := s.EngineOptions.Config.MaxConcurrentCompactions if lim == 0 { + lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions + if lim < 1 { + lim = 1 + } + } + + // Don't allow more compactions to run than cores. + if lim > runtime.GOMAXPROCS(0) { lim = runtime.GOMAXPROCS(0) } - s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) + // If only one compacttion can run at time, use the same limiter for high and low + // priority work. + if lim == 1 { + s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(1) + s.EngineOptions.LoPriCompactionLimiter = s.EngineOptions.HiPriCompactionLimiter + } else { + // Split the available high and low priority limiters between the available cores. + // The high priority work can steal from low priority at times so it can use the + // full limit if there is pending work. The low priority is capped at half the + // limit. + s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(lim/2 + lim%2) + s.EngineOptions.LoPriCompactionLimiter = limiter.NewFixed(lim / 2) + } + + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res) var n int