Default concurrent compactions to 50% of available cores
parent
deef0c5649
commit
db204f3eb7
|
@ -88,7 +88,8 @@
|
|||
# compact-full-write-cold-duration = "4h"
|
||||
|
||||
# The maximum number of concurrent full and level compactions that can run at one time. A
|
||||
# value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply
|
||||
# value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater
|
||||
# than 0 limits compactions to that value. This setting does not apply
|
||||
# to cache snapshotting.
|
||||
# max-concurrent-compactions = 0
|
||||
|
||||
|
@ -358,10 +359,10 @@
|
|||
# UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
|
||||
# read-buffer = 0
|
||||
|
||||
# Multi-value plugins can be handled two ways.
|
||||
# Multi-value plugins can be handled two ways.
|
||||
# "split" will parse and store the multi-value plugin data into separate measurements
|
||||
# "join" will parse and store the multi-value plugin as a single multi-value measurement.
|
||||
# "split" is the default behavior for backward compatability with previous versions of influxdb.
|
||||
# "join" will parse and store the multi-value plugin as a single multi-value measurement.
|
||||
# "split" is the default behavior for backward compatability with previous versions of influxdb.
|
||||
# parse-multivalue-plugin = "split"
|
||||
###
|
||||
### [opentsdb]
|
||||
|
|
|
@ -10,10 +10,27 @@ func NewFixed(limit int) Fixed {
|
|||
return make(Fixed, limit)
|
||||
}
|
||||
|
||||
// Idle returns true if the limiter has all its capacity is available.
|
||||
func (t Fixed) Idle() bool {
|
||||
return len(t) == cap(t)
|
||||
}
|
||||
|
||||
// TryTake attempts to take a token and return true if successful, otherwise returns false.
|
||||
func (t Fixed) TryTake() bool {
|
||||
select {
|
||||
case t <- struct{}{}:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Take attempts to take a token and blocks until one is available.
|
||||
func (t Fixed) Take() {
|
||||
t <- struct{}{}
|
||||
}
|
||||
|
||||
// Release releases a token back to the limiter.
|
||||
func (t Fixed) Release() {
|
||||
<-t
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ const (
|
|||
DefaultMaxValuesPerTag = 100000
|
||||
|
||||
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
|
||||
// that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime.
|
||||
// that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime.
|
||||
DefaultMaxConcurrentCompactions = 0
|
||||
)
|
||||
|
||||
|
|
|
@ -144,11 +144,13 @@ func NewEngine(id uint64, i Index, database, path string, walPath string, option
|
|||
|
||||
// EngineOptions represents the options used to initialize the engine.
|
||||
type EngineOptions struct {
|
||||
EngineVersion string
|
||||
IndexVersion string
|
||||
ShardID uint64
|
||||
InmemIndex interface{} // shared in-memory index
|
||||
CompactionLimiter limiter.Fixed
|
||||
EngineVersion string
|
||||
IndexVersion string
|
||||
ShardID uint64
|
||||
InmemIndex interface{} // shared in-memory index
|
||||
|
||||
HiPriCompactionLimiter limiter.Fixed
|
||||
LoPriCompactionLimiter limiter.Fixed
|
||||
|
||||
Config Config
|
||||
}
|
||||
|
|
|
@ -230,7 +230,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
|
|||
// Each compaction group should run against 4 generations. For level 1, since these
|
||||
// can get created much more quickly, bump the grouping to 8 to keep file counts lower.
|
||||
groupSize := 4
|
||||
if level == 1 {
|
||||
if level == 1 || level == 3 {
|
||||
groupSize = 8
|
||||
}
|
||||
|
||||
|
|
|
@ -137,8 +137,10 @@ type Engine struct {
|
|||
|
||||
stats *EngineStatistics
|
||||
|
||||
// The limiter for concurrent compactions
|
||||
compactionLimiter limiter.Fixed
|
||||
// Limiters for concurrent compactions. The low priority limiter is for level 3 and 4
|
||||
// compactions. The high priority is for level 1 and 2 compactions.
|
||||
loPriCompactionLimiter limiter.Fixed
|
||||
hiPriCompactionLimiter limiter.Fixed
|
||||
}
|
||||
|
||||
// NewEngine returns a new instance of Engine.
|
||||
|
@ -176,8 +178,9 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
|
|||
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
|
||||
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
||||
enableCompactionsOnOpen: true,
|
||||
stats: &EngineStatistics{},
|
||||
compactionLimiter: opt.CompactionLimiter,
|
||||
stats: &EngineStatistics{},
|
||||
loPriCompactionLimiter: opt.LoPriCompactionLimiter,
|
||||
hiPriCompactionLimiter: opt.HiPriCompactionLimiter,
|
||||
}
|
||||
|
||||
// Attach fieldset to index.
|
||||
|
@ -1346,46 +1349,33 @@ func (e *Engine) onFileStoreReplace(newFiles []TSMFile) {
|
|||
type compactionStrategy struct {
|
||||
compactionGroups []CompactionGroup
|
||||
|
||||
// concurrency determines how many compactions groups will be started
|
||||
// concurrently. These groups may be limited by the global limiter if
|
||||
// enabled.
|
||||
concurrency int
|
||||
fast bool
|
||||
description string
|
||||
level int
|
||||
|
||||
durationStat *int64
|
||||
activeStat *int64
|
||||
successStat *int64
|
||||
errorStat *int64
|
||||
|
||||
logger zap.Logger
|
||||
compactor *Compactor
|
||||
fileStore *FileStore
|
||||
limiter limiter.Fixed
|
||||
engine *Engine
|
||||
logger zap.Logger
|
||||
compactor *Compactor
|
||||
fileStore *FileStore
|
||||
loPriLimiter limiter.Fixed
|
||||
hiPriLimiter limiter.Fixed
|
||||
|
||||
engine *Engine
|
||||
}
|
||||
|
||||
// Apply concurrently compacts all the groups in a compaction strategy.
|
||||
func (s *compactionStrategy) Apply() {
|
||||
start := time.Now()
|
||||
|
||||
// cap concurrent compaction groups to no more than 4 at a time.
|
||||
concurrency := s.concurrency
|
||||
if concurrency == 0 {
|
||||
concurrency = 4
|
||||
}
|
||||
|
||||
throttle := limiter.NewFixed(concurrency)
|
||||
var wg sync.WaitGroup
|
||||
for i := range s.compactionGroups {
|
||||
wg.Add(1)
|
||||
go func(groupNum int) {
|
||||
defer wg.Done()
|
||||
|
||||
// limit concurrent compaction groups
|
||||
throttle.Take()
|
||||
defer throttle.Release()
|
||||
|
||||
s.compactGroup(groupNum)
|
||||
}(i)
|
||||
}
|
||||
|
@ -1396,10 +1386,31 @@ func (s *compactionStrategy) Apply() {
|
|||
|
||||
// compactGroup executes the compaction strategy against a single CompactionGroup.
|
||||
func (s *compactionStrategy) compactGroup(groupNum int) {
|
||||
// Limit concurrent compactions if we have a limiter
|
||||
if cap(s.limiter) > 0 {
|
||||
s.limiter.Take()
|
||||
defer s.limiter.Release()
|
||||
// Level 1 and 2 are high priority and have a larger slice of the pool. If all
|
||||
// the high priority capacity is used up, they can steal from the low priority
|
||||
// pool as well if there is capacity. Otherwise, it wait on the high priority
|
||||
// limiter until an running compaction completes. Level 3 and 4 are low priority
|
||||
// as they are generally larger compactions and more expensive to run. They can
|
||||
// steal a little from the high priority limiter if there is no high priority work.
|
||||
switch s.level {
|
||||
case 1, 2:
|
||||
if s.hiPriLimiter.TryTake() {
|
||||
defer s.hiPriLimiter.Release()
|
||||
} else if s.loPriLimiter.TryTake() {
|
||||
defer s.loPriLimiter.Release()
|
||||
} else {
|
||||
s.hiPriLimiter.Take()
|
||||
defer s.hiPriLimiter.Release()
|
||||
}
|
||||
default:
|
||||
if s.loPriLimiter.TryTake() {
|
||||
defer s.loPriLimiter.Release()
|
||||
} else if s.hiPriLimiter.Idle() && s.hiPriLimiter.TryTake() {
|
||||
defer s.hiPriLimiter.Release()
|
||||
} else {
|
||||
s.loPriLimiter.Take()
|
||||
defer s.loPriLimiter.Release()
|
||||
}
|
||||
}
|
||||
|
||||
group := s.compactionGroups[groupNum]
|
||||
|
@ -1462,14 +1473,15 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate
|
|||
}
|
||||
|
||||
return &compactionStrategy{
|
||||
concurrency: 4,
|
||||
compactionGroups: compactionGroups,
|
||||
logger: e.logger,
|
||||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: fast,
|
||||
limiter: e.compactionLimiter,
|
||||
loPriLimiter: e.loPriCompactionLimiter,
|
||||
hiPriLimiter: e.hiPriCompactionLimiter,
|
||||
engine: e,
|
||||
level: level,
|
||||
|
||||
description: fmt.Sprintf("level %d", level),
|
||||
activeStat: &e.stats.TSMCompactionsActive[level-1],
|
||||
|
@ -1495,14 +1507,15 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy {
|
|||
}
|
||||
|
||||
s := &compactionStrategy{
|
||||
concurrency: 1,
|
||||
compactionGroups: compactionGroups,
|
||||
logger: e.logger,
|
||||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: optimize,
|
||||
limiter: e.compactionLimiter,
|
||||
loPriLimiter: e.loPriCompactionLimiter,
|
||||
hiPriLimiter: e.hiPriCompactionLimiter,
|
||||
engine: e,
|
||||
level: 4,
|
||||
}
|
||||
|
||||
if optimize {
|
||||
|
|
|
@ -159,15 +159,35 @@ func (s *Store) loadShards() error {
|
|||
err error
|
||||
}
|
||||
|
||||
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
|
||||
// Setup a shared limiter for compactions
|
||||
lim := s.EngineOptions.Config.MaxConcurrentCompactions
|
||||
if lim == 0 {
|
||||
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions
|
||||
if lim < 1 {
|
||||
lim = 1
|
||||
}
|
||||
}
|
||||
|
||||
// Don't allow more compactions to run than cores.
|
||||
if lim > runtime.GOMAXPROCS(0) {
|
||||
lim = runtime.GOMAXPROCS(0)
|
||||
}
|
||||
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
|
||||
|
||||
// If only one compacttion can run at time, use the same limiter for high and low
|
||||
// priority work.
|
||||
if lim == 1 {
|
||||
s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(1)
|
||||
s.EngineOptions.LoPriCompactionLimiter = s.EngineOptions.HiPriCompactionLimiter
|
||||
} else {
|
||||
// Split the available high and low priority limiters between the available cores.
|
||||
// The high priority work can steal from low priority at times so it can use the
|
||||
// full limit if there is pending work. The low priority is capped at half the
|
||||
// limit.
|
||||
s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(lim/2 + lim%2)
|
||||
s.EngineOptions.LoPriCompactionLimiter = limiter.NewFixed(lim / 2)
|
||||
}
|
||||
|
||||
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
resC := make(chan *res)
|
||||
var n int
|
||||
|
||||
|
|
Loading…
Reference in New Issue