Add max-concurrent-compactions limit
This limit allows the number of concurrent level and full compactions to be throttled. Snapshot compactions are not affected by this limit as then need to run continously. This limit can be used to control how much CPU is consumed by compactions. The default is to limit to the number of CPU available.pull/8348/head
parent
80fef4af4a
commit
8fc9853ed8
|
@ -89,6 +89,11 @@
|
|||
# write or delete
|
||||
# compact-full-write-cold-duration = "4h"
|
||||
|
||||
# The maximum number of concurrent full and level compactions that can run at one time. A
|
||||
# value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply
|
||||
# to cache snapshotting.
|
||||
# max-concurrent-compactions = 0
|
||||
|
||||
# The maximum series allowed per database before writes are dropped. This limit can prevent
|
||||
# high cardinality issues at the database level. This limit can be disabled by setting it to
|
||||
# 0.
|
||||
|
|
|
@ -47,6 +47,10 @@ const (
|
|||
|
||||
// DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement.
|
||||
DefaultMaxValuesPerTag = 100000
|
||||
|
||||
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
|
||||
// that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime.
|
||||
DefaultMaxConcurrentCompactions = 0
|
||||
)
|
||||
|
||||
// Config holds the configuration for the tsbd package.
|
||||
|
@ -84,6 +88,12 @@ type Config struct {
|
|||
// A value of 0 disables the limit.
|
||||
MaxValuesPerTag int `toml:"max-values-per-tag"`
|
||||
|
||||
// MaxConcurrentCompactions is the maximum number of concurrent level and full compactions
|
||||
// that can be running at one time across all shards. Compactions scheduled to run when the
|
||||
// limit is reached are blocked until a running compaction completes. Snapshot compactions are
|
||||
// not affected by this limit. A value of 0 limits compactions to runtime.GOMAXPROCS(0).
|
||||
MaxConcurrentCompactions int `toml:"max-concurrent-compactions"`
|
||||
|
||||
TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
|
||||
}
|
||||
|
||||
|
@ -100,8 +110,9 @@ func NewConfig() Config {
|
|||
CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration),
|
||||
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),
|
||||
|
||||
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
|
||||
MaxValuesPerTag: DefaultMaxValuesPerTag,
|
||||
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
|
||||
MaxValuesPerTag: DefaultMaxValuesPerTag,
|
||||
MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,
|
||||
|
||||
TraceLoggingEnabled: false,
|
||||
}
|
||||
|
@ -115,6 +126,10 @@ func (c *Config) Validate() error {
|
|||
return errors.New("Data.WALDir must be specified")
|
||||
}
|
||||
|
||||
if c.MaxConcurrentCompactions < 0 {
|
||||
return errors.New("max-concurrent-compactions must be greater than 0")
|
||||
}
|
||||
|
||||
valid := false
|
||||
for _, e := range RegisteredEngines() {
|
||||
if e == c.Engine {
|
||||
|
@ -152,5 +167,6 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
|
|||
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
|
||||
"max-series-per-database": c.MaxSeriesPerDatabase,
|
||||
"max-values-per-tag": c.MaxValuesPerTag,
|
||||
"max-concurrent-compactions": c.MaxConcurrentCompactions,
|
||||
}), nil
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
"github.com/uber-go/zap"
|
||||
)
|
||||
|
||||
|
@ -136,10 +137,11 @@ func NewEngine(id uint64, i Index, path string, walPath string, options EngineOp
|
|||
|
||||
// EngineOptions represents the options used to initialize the engine.
|
||||
type EngineOptions struct {
|
||||
EngineVersion string
|
||||
IndexVersion string
|
||||
ShardID uint64
|
||||
InmemIndex interface{} // shared in-memory index
|
||||
EngineVersion string
|
||||
IndexVersion string
|
||||
ShardID uint64
|
||||
InmemIndex interface{} // shared in-memory index
|
||||
CompactionLimiter limiter.Fixed
|
||||
|
||||
Config Config
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
_ "github.com/influxdata/influxdb/tsdb/index"
|
||||
"github.com/uber-go/zap"
|
||||
|
@ -132,6 +133,9 @@ type Engine struct {
|
|||
enableCompactionsOnOpen bool
|
||||
|
||||
stats *EngineStatistics
|
||||
|
||||
// The limiter for concurrent compactions
|
||||
compactionLimiter limiter.Fixed
|
||||
}
|
||||
|
||||
// NewEngine returns a new instance of Engine.
|
||||
|
@ -171,7 +175,8 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.
|
|||
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
|
||||
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
||||
enableCompactionsOnOpen: true,
|
||||
stats: &EngineStatistics{},
|
||||
stats: &EngineStatistics{},
|
||||
compactionLimiter: opt.CompactionLimiter,
|
||||
}
|
||||
|
||||
// Attach fieldset to index.
|
||||
|
@ -1205,6 +1210,7 @@ type compactionStrategy struct {
|
|||
logger zap.Logger
|
||||
compactor *Compactor
|
||||
fileStore *FileStore
|
||||
limiter limiter.Fixed
|
||||
}
|
||||
|
||||
// Apply concurrently compacts all the groups in a compaction strategy.
|
||||
|
@ -1226,6 +1232,12 @@ func (s *compactionStrategy) Apply() {
|
|||
|
||||
// compactGroup executes the compaction strategy against a single CompactionGroup.
|
||||
func (s *compactionStrategy) compactGroup(groupNum int) {
|
||||
// Limit concurrent compactions if we have a limiter
|
||||
if cap(s.limiter) > 0 {
|
||||
s.limiter.Take()
|
||||
defer s.limiter.Release()
|
||||
}
|
||||
|
||||
group := s.compactionGroups[groupNum]
|
||||
start := time.Now()
|
||||
s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group)))
|
||||
|
@ -1290,6 +1302,7 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate
|
|||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: fast,
|
||||
limiter: e.compactionLimiter,
|
||||
|
||||
description: fmt.Sprintf("level %d", level),
|
||||
activeStat: &e.stats.TSMCompactionsActive[level-1],
|
||||
|
@ -1320,6 +1333,7 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy {
|
|||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: optimize,
|
||||
limiter: e.compactionLimiter,
|
||||
}
|
||||
|
||||
if optimize {
|
||||
|
|
|
@ -158,6 +158,13 @@ func (s *Store) loadShards() error {
|
|||
|
||||
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
|
||||
// Setup a shared limiter for compactions
|
||||
lim := s.EngineOptions.Config.MaxConcurrentCompactions
|
||||
if lim == 0 {
|
||||
lim = runtime.GOMAXPROCS(0)
|
||||
}
|
||||
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
|
||||
|
||||
resC := make(chan *res)
|
||||
var n int
|
||||
|
||||
|
|
Loading…
Reference in New Issue