diff --git a/pkg/limiter/fixed.go b/pkg/limiter/fixed.go index 85815d81dc..19d967d74f 100644 --- a/pkg/limiter/fixed.go +++ b/pkg/limiter/fixed.go @@ -15,6 +15,16 @@ func (t Fixed) Idle() bool { return len(t) == cap(t) } +// Available returns the number of available tokens that may be taken. +func (t Fixed) Available() int { + return cap(t) - len(t) +} + +// Capacity returns the number of tokens can be taken. +func (t Fixed) Capacity() int { + return cap(t) +} + // TryTake attempts to take a token and return true if successful, otherwise returns false. func (t Fixed) TryTake() bool { select { diff --git a/pkg/limiter/fixed_test.go b/pkg/limiter/fixed_test.go new file mode 100644 index 0000000000..b45a7274c2 --- /dev/null +++ b/pkg/limiter/fixed_test.go @@ -0,0 +1,26 @@ +package limiter_test + +import ( + "testing" + + "github.com/influxdata/influxdb/pkg/limiter" +) + +func TestFixed_Available(t *testing.T) { + f := limiter.NewFixed(10) + if exp, got := 10, f.Available(); exp != got { + t.Fatalf("available mismatch: exp %v, got %v", exp, got) + } + + f.Take() + + if exp, got := 9, f.Available(); exp != got { + t.Fatalf("available mismatch: exp %v, got %v", exp, got) + } + + f.Release() + + if exp, got := 10, f.Available(); exp != got { + t.Fatalf("available mismatch: exp %v, got %v", exp, got) + } +} diff --git a/tsdb/engine.go b/tsdb/engine.go index d5e5cb0d5a..2c8a1a25e5 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -149,8 +149,7 @@ type EngineOptions struct { ShardID uint64 InmemIndex interface{} // shared in-memory index - HiPriCompactionLimiter limiter.Fixed - LoPriCompactionLimiter limiter.Fixed + CompactionLimiter limiter.Fixed Config Config } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 58fb4e2a48..d986396c14 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -230,7 +230,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // Each compaction group should run against 4 generations. For level 1, since these // can get created much more quickly, bump the grouping to 8 to keep file counts lower. groupSize := 4 - if level == 1 || level == 3 { + if level == 1 { groupSize = 8 } @@ -711,15 +711,22 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { return nil, errSnapshotsDisabled } - concurrency := 1 + concurrency, maxConcurrency := 1, runtime.GOMAXPROCS(0)/4 + if maxConcurrency < 1 { + maxConcurrency = 1 + } + if maxConcurrency > 4 { + maxConcurrency = 4 + } + card := cache.Count() if card >= 1024*1024 { concurrency = card / 1024 * 1024 if concurrency < 1 { concurrency = 1 } - if concurrency > 4 { - concurrency = 4 + if concurrency > maxConcurrency { + concurrency = maxConcurrency } } splits := cache.Split(concurrency) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 332228048f..afe944d101 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -137,10 +137,10 @@ type Engine struct { stats *EngineStatistics - // Limiters for concurrent compactions. The low priority limiter is for level 3 and 4 - // compactions. The high priority is for level 1 and 2 compactions. - loPriCompactionLimiter limiter.Fixed - hiPriCompactionLimiter limiter.Fixed + // Limiter for concurrent compactions. + compactionLimiter limiter.Fixed + + scheduler *scheduler } // NewEngine returns a new instance of Engine. @@ -157,6 +157,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, } logger := zap.New(zap.NullEncoder()) + stats := &EngineStatistics{} e := &Engine{ id: id, database: database, @@ -178,9 +179,9 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), enableCompactionsOnOpen: true, - stats: &EngineStatistics{}, - loPriCompactionLimiter: opt.LoPriCompactionLimiter, - hiPriCompactionLimiter: opt.HiPriCompactionLimiter, + stats: stats, + compactionLimiter: opt.CompactionLimiter, + scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()), } // Attach fieldset to index. @@ -242,13 +243,10 @@ func (e *Engine) enableLevelCompactions(wait bool) { quit := make(chan struct{}) e.done = quit - e.wg.Add(4) + e.wg.Add(1) e.mu.Unlock() - go func() { defer e.wg.Done(); e.compactTSMFull(quit) }() - go func() { defer e.wg.Done(); e.compactTSMLevel(true, 1, quit) }() - go func() { defer e.wg.Done(); e.compactTSMLevel(true, 2, quit) }() - go func() { defer e.wg.Done(); e.compactTSMLevel(true, 3, quit) }() + go func() { defer e.wg.Done(); e.compact(quit) }() } // disableLevelCompactions will stop level compactions before returning. @@ -1249,7 +1247,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration } -func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { +func (e *Engine) compact(quit <-chan struct{}) { t := time.NewTicker(time.Second) defer t.Stop() @@ -1259,35 +1257,158 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { return case <-t.C: - s := e.levelCompactionStrategy(fast, level) - if s != nil { - s.Apply() - // Release the files in the compaction plan - e.CompactionPlan.Release(s.compactionGroups) + + // level 1 and 2 are higher priority and can take all the available capacity + // of the hi and lo limiter. + level1Groups := e.CompactionPlan.PlanLevel(1) + level2Groups := e.CompactionPlan.PlanLevel(2) + level3Groups := e.CompactionPlan.PlanLevel(3) + level4Groups := e.CompactionPlan.Plan(e.WAL.LastWriteTime()) + if len(level4Groups) == 0 { + level4Groups = e.CompactionPlan.PlanOptimize() } + run1 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]) + run2 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]) + run3 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]) + run4 := atomic.LoadInt64(&e.stats.TSMFullCompactionsActive) + + e.traceLogger.Info(fmt.Sprintf("compact id=%d (%d/%d) (%d/%d) (%d/%d) (%d/%d)", + e.id, + run1, len(level1Groups), + run2, len(level2Groups), + run3, len(level3Groups), + run4, len(level4Groups))) + + e.scheduler.setDepth(1, len(level1Groups)) + e.scheduler.setDepth(2, len(level2Groups)) + e.scheduler.setDepth(3, len(level3Groups)) + e.scheduler.setDepth(4, len(level4Groups)) + + for level, runnable := e.scheduler.next(); runnable; level, runnable = e.scheduler.next() { + run1 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]) + run2 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]) + run3 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]) + run4 := atomic.LoadInt64(&e.stats.TSMFullCompactionsActive) + + e.traceLogger.Info(fmt.Sprintf("compact run=%d id=%d (%d/%d) (%d/%d) (%d/%d) (%d/%d)", + level, e.id, + run1, len(level1Groups), + run2, len(level2Groups), + run3, len(level3Groups), + run4, len(level4Groups))) + + switch level { + case 1: + level1Groups = e.compactHiPriorityLevel(level1Groups, 1) + e.scheduler.setDepth(1, len(level1Groups)) + case 2: + level2Groups = e.compactHiPriorityLevel(level2Groups, 2) + e.scheduler.setDepth(2, len(level2Groups)) + case 3: + level3Groups = e.compactLoPriorityLevel(level3Groups, 3) + e.scheduler.setDepth(3, len(level3Groups)) + case 4: + level4Groups = e.compactFull(level4Groups) + e.scheduler.setDepth(4, len(level4Groups)) + + } + } + + // Release all the plans we didn't start. + e.CompactionPlan.Release(level1Groups) + e.CompactionPlan.Release(level2Groups) + e.CompactionPlan.Release(level3Groups) + e.CompactionPlan.Release(level4Groups) } } } -func (e *Engine) compactTSMFull(quit <-chan struct{}) { - t := time.NewTicker(time.Second) - defer t.Stop() +// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns +// the plans that were not able to be started. +func (e *Engine) compactHiPriorityLevel(groups []CompactionGroup, level int) []CompactionGroup { + // Grab the first group + grp := groups[:1] - for { - select { - case <-quit: - return - - case <-t.C: - s := e.fullCompactionStrategy() - if s != nil { - s.Apply() - // Release the files in the compaction plan - e.CompactionPlan.Release(s.compactionGroups) - } - } + s := e.levelCompactionStrategy(grp, true, level) + if s == nil { + // break + return groups } + + // Try hi priority limiter, otherwise steal a little from the low priority if we can. + if e.compactionLimiter.TryTake() { + atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1) + + go func() { + defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1) + + defer e.compactionLimiter.Release() + s.Apply() + // Release the files in the compaction plan + e.CompactionPlan.Release(s.compactionGroups) + }() + // // Slice off the group we just ran, it will be released when the compaction + // goroutine exits. + groups = groups[1:] + } + + // Return the unused plans + return groups +} + +// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns +// the plans that were not able to be started +func (e *Engine) compactLoPriorityLevel(groups []CompactionGroup, level int) []CompactionGroup { + grp := groups[:1] + + s := e.levelCompactionStrategy(grp, true, level) + if s == nil { + // break + return groups + } + + // Try the lo priority limiter, otherwise steal a little from the high priority if we can. + if e.compactionLimiter.TryTake() { + atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1) + + go func() { + defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1) + defer e.compactionLimiter.Release() + s.Apply() + // Release the files in the compaction plan + e.CompactionPlan.Release(s.compactionGroups) + }() + groups = groups[1:] + } + return groups +} + +// compactFull kicks off full and optimize compactions using the lo priority policy. It returns +// the plans that were not able to be started. +func (e *Engine) compactFull(groups []CompactionGroup) []CompactionGroup { + grp := groups[:1] + + s := e.fullCompactionStrategy(grp, false) + if s == nil { + //break + return groups + } + + // Try the lo priority limiter, otherwise steal a little from the high priority if we can. + if e.compactionLimiter.TryTake() { + atomic.AddInt64(&e.stats.TSMFullCompactionsActive, 1) + + go func() { + defer atomic.AddInt64(&e.stats.TSMFullCompactionsActive, -1) + defer e.compactionLimiter.Release() + s.Apply() + // Release the files in the compaction plan + e.CompactionPlan.Release(s.compactionGroups) + }() + groups = groups[1:] + } + return groups } // onFileStoreReplace is callback handler invoked when the FileStore @@ -1358,11 +1479,9 @@ type compactionStrategy struct { successStat *int64 errorStat *int64 - logger zap.Logger - compactor *Compactor - fileStore *FileStore - loPriLimiter limiter.Fixed - hiPriLimiter limiter.Fixed + logger zap.Logger + compactor *Compactor + fileStore *FileStore engine *Engine } @@ -1386,33 +1505,6 @@ func (s *compactionStrategy) Apply() { // compactGroup executes the compaction strategy against a single CompactionGroup. func (s *compactionStrategy) compactGroup(groupNum int) { - // Level 1 and 2 are high priority and have a larger slice of the pool. If all - // the high priority capacity is used up, they can steal from the low priority - // pool as well if there is capacity. Otherwise, it wait on the high priority - // limiter until an running compaction completes. Level 3 and 4 are low priority - // as they are generally larger compactions and more expensive to run. They can - // steal a little from the high priority limiter if there is no high priority work. - switch s.level { - case 1, 2: - if s.hiPriLimiter.TryTake() { - defer s.hiPriLimiter.Release() - } else if s.loPriLimiter.TryTake() { - defer s.loPriLimiter.Release() - } else { - s.hiPriLimiter.Take() - defer s.hiPriLimiter.Release() - } - default: - if s.loPriLimiter.TryTake() { - defer s.loPriLimiter.Release() - } else if s.hiPriLimiter.Idle() && s.hiPriLimiter.TryTake() { - defer s.hiPriLimiter.Release() - } else { - s.loPriLimiter.Take() - defer s.loPriLimiter.Release() - } - } - group := s.compactionGroups[groupNum] start := time.Now() s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group))) @@ -1420,17 +1512,16 @@ func (s *compactionStrategy) compactGroup(groupNum int) { s.logger.Info(fmt.Sprintf("compacting %s group (%d) %s (#%d)", s.description, groupNum, f, i)) } - files, err := func() ([]string, error) { - // Count the compaction as active only while the compaction is actually running. - atomic.AddInt64(s.activeStat, 1) - defer atomic.AddInt64(s.activeStat, -1) + var ( + err error + files []string + ) - if s.fast { - return s.compactor.CompactFast(group) - } else { - return s.compactor.CompactFull(group) - } - }() + if s.fast { + files, err = s.compactor.CompactFast(group) + } else { + files, err = s.compactor.CompactFull(group) + } if err != nil { _, inProgress := err.(errCompactionInProgress) @@ -1465,9 +1556,7 @@ func (s *compactionStrategy) compactGroup(groupNum int) { // levelCompactionStrategy returns a compactionStrategy for the given level. // It returns nil if there are no TSM files to compact. -func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrategy { - compactionGroups := e.CompactionPlan.PlanLevel(level) - +func (e *Engine) levelCompactionStrategy(compactionGroups []CompactionGroup, fast bool, level int) *compactionStrategy { if len(compactionGroups) == 0 { return nil } @@ -1478,8 +1567,6 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate fileStore: e.FileStore, compactor: e.Compactor, fast: fast, - loPriLimiter: e.loPriCompactionLimiter, - hiPriLimiter: e.hiPriCompactionLimiter, engine: e, level: level, @@ -1493,15 +1580,7 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate // fullCompactionStrategy returns a compactionStrategy for higher level generations of TSM files. // It returns nil if there are no TSM files to compact. -func (e *Engine) fullCompactionStrategy() *compactionStrategy { - optimize := false - compactionGroups := e.CompactionPlan.Plan(e.WAL.LastWriteTime()) - - if len(compactionGroups) == 0 { - optimize = true - compactionGroups = e.CompactionPlan.PlanOptimize() - } - +func (e *Engine) fullCompactionStrategy(compactionGroups []CompactionGroup, optimize bool) *compactionStrategy { if len(compactionGroups) == 0 { return nil } @@ -1512,8 +1591,6 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy { fileStore: e.FileStore, compactor: e.Compactor, fast: optimize, - loPriLimiter: e.loPriCompactionLimiter, - hiPriLimiter: e.hiPriCompactionLimiter, engine: e, level: 4, } diff --git a/tsdb/engine/tsm1/scheduler.go b/tsdb/engine/tsm1/scheduler.go new file mode 100644 index 0000000000..7049aabac0 --- /dev/null +++ b/tsdb/engine/tsm1/scheduler.go @@ -0,0 +1,79 @@ +package tsm1 + +import ( + "sync/atomic" +) + +var defaultWeights = [4]float64{0.4, 0.3, 0.2, 0.1} + +type scheduler struct { + maxConcurrency int + stats *EngineStatistics + + // queues is the depth of work pending for each compaction level + queues [4]int + weights [4]float64 +} + +func newScheduler(stats *EngineStatistics, maxConcurrency int) *scheduler { + return &scheduler{ + stats: stats, + maxConcurrency: maxConcurrency, + weights: defaultWeights, + } +} + +func (s *scheduler) setDepth(level, depth int) { + level = level - 1 + if level < 0 || level > len(s.queues) { + return + } + + s.queues[level] = depth +} + +func (s *scheduler) next() (int, bool) { + level1Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[0])) + level2Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[1])) + level3Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[2])) + level4Running := int(atomic.LoadInt64(&s.stats.TSMFullCompactionsActive) + atomic.LoadInt64(&s.stats.TSMOptimizeCompactionsActive)) + + if level1Running+level2Running+level3Running+level4Running >= s.maxConcurrency { + return 0, false + } + + var ( + level int + runnable bool + ) + + loLimit, _ := s.limits() + + end := len(s.queues) + if level3Running+level4Running >= loLimit { + end = 2 + } + + var weight float64 + for i := 0; i < end; i++ { + if float64(s.queues[i])*s.weights[i] > weight { + level, runnable = i+1, true + weight = float64(s.queues[i]) * s.weights[i] + } + } + return level, runnable +} + +func (s *scheduler) limits() (int, int) { + hiLimit := s.maxConcurrency * 4 / 5 + loLimit := (s.maxConcurrency / 5) + 1 + if hiLimit == 0 { + hiLimit = 1 + } + + if loLimit == 0 { + loLimit = 1 + } + + return loLimit, hiLimit +} diff --git a/tsdb/engine/tsm1/scheduler_test.go b/tsdb/engine/tsm1/scheduler_test.go new file mode 100644 index 0000000000..9ff40b0e5f --- /dev/null +++ b/tsdb/engine/tsm1/scheduler_test.go @@ -0,0 +1,74 @@ +package tsm1 + +import "testing" + +func TestScheduler_Runnable_Empty(t *testing.T) { + s := newScheduler(&EngineStatistics{}, 1) + + for i := 1; i < 5; i++ { + s.setDepth(i, 1) + level, runnable := s.next() + if exp, got := true, runnable; exp != got { + t.Fatalf("runnable(%d) mismatch: exp %v, got %v ", i, exp, got) + } + + if exp, got := i, level; exp != got { + t.Fatalf("runnable(%d) mismatch: exp %v, got %v ", i, exp, got) + } + s.setDepth(i, 0) + } +} + +func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { + s := newScheduler(&EngineStatistics{}, 1) + + // level 1 + s.stats = &EngineStatistics{} + s.stats.TSMCompactionsActive[0] = 1 + for i := 0; i <= 4; i++ { + _, runnable := s.next() + if exp, got := false, runnable; exp != got { + t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got) + } + } + + // level 2 + s.stats = &EngineStatistics{} + s.stats.TSMCompactionsActive[1] = 1 + for i := 0; i <= 4; i++ { + _, runnable := s.next() + if exp, got := false, runnable; exp != got { + t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got) + } + } + + // level 3 + s.stats = &EngineStatistics{} + s.stats.TSMCompactionsActive[2] = 1 + for i := 0; i <= 4; i++ { + _, runnable := s.next() + if exp, got := false, runnable; exp != got { + t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got) + } + } + + // optimize + s.stats = &EngineStatistics{} + s.stats.TSMOptimizeCompactionsActive++ + for i := 0; i <= 4; i++ { + _, runnable := s.next() + if exp, got := false, runnable; exp != got { + t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got) + } + } + + // full + s.stats = &EngineStatistics{} + s.stats.TSMFullCompactionsActive++ + for i := 0; i <= 4; i++ { + _, runnable := s.next() + if exp, got := false, runnable; exp != got { + t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got) + } + } +} diff --git a/tsdb/store.go b/tsdb/store.go index 28f3b78a06..902dcbace7 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -174,19 +174,7 @@ func (s *Store) loadShards() error { lim = runtime.GOMAXPROCS(0) } - // If only one compacttion can run at time, use the same limiter for high and low - // priority work. - if lim == 1 { - s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(1) - s.EngineOptions.LoPriCompactionLimiter = s.EngineOptions.HiPriCompactionLimiter - } else { - // Split the available high and low priority limiters between the available cores. - // The high priority work can steal from low priority at times so it can use the - // full limit if there is pending work. The low priority is capped at half the - // limit. - s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(lim/2 + lim%2) - s.EngineOptions.LoPriCompactionLimiter = limiter.NewFixed(lim / 2) - } + s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res)