Rework compaction scheduling

This changes the compaction scheduling to better utilize the available
cores that are free.  Previously, a level was planned in its own goroutine
and would kick off a number of compactions groups.  The problem with this
model was that if there were 4 groups, and 3 completed quickly, the planning
would be blocked for that level until the last group finished.  If the compactions
at the prior level are running more quickly, a large backlog could accumlate.

This now moves the planning to a single goroutine that plans each level in
succession and starts as many groups as it can.  When one group finishes,
the planning will start the next group for the level.
pull/8886/head
Jason Wilder 2017-09-26 18:15:39 -06:00
parent f668b0cc3f
commit ae821f4e2d
8 changed files with 371 additions and 111 deletions

View File

@ -15,6 +15,16 @@ func (t Fixed) Idle() bool {
return len(t) == cap(t)
}
// Available returns the number of available tokens that may be taken.
func (t Fixed) Available() int {
return cap(t) - len(t)
}
// Capacity returns the number of tokens can be taken.
func (t Fixed) Capacity() int {
return cap(t)
}
// TryTake attempts to take a token and return true if successful, otherwise returns false.
func (t Fixed) TryTake() bool {
select {

26
pkg/limiter/fixed_test.go Normal file
View File

@ -0,0 +1,26 @@
package limiter_test
import (
"testing"
"github.com/influxdata/influxdb/pkg/limiter"
)
func TestFixed_Available(t *testing.T) {
f := limiter.NewFixed(10)
if exp, got := 10, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
f.Take()
if exp, got := 9, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
f.Release()
if exp, got := 10, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
}

View File

@ -149,8 +149,7 @@ type EngineOptions struct {
ShardID uint64
InmemIndex interface{} // shared in-memory index
HiPriCompactionLimiter limiter.Fixed
LoPriCompactionLimiter limiter.Fixed
CompactionLimiter limiter.Fixed
Config Config
}

View File

@ -230,7 +230,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// Each compaction group should run against 4 generations. For level 1, since these
// can get created much more quickly, bump the grouping to 8 to keep file counts lower.
groupSize := 4
if level == 1 || level == 3 {
if level == 1 {
groupSize = 8
}
@ -711,15 +711,22 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
return nil, errSnapshotsDisabled
}
concurrency := 1
concurrency, maxConcurrency := 1, runtime.GOMAXPROCS(0)/4
if maxConcurrency < 1 {
maxConcurrency = 1
}
if maxConcurrency > 4 {
maxConcurrency = 4
}
card := cache.Count()
if card >= 1024*1024 {
concurrency = card / 1024 * 1024
if concurrency < 1 {
concurrency = 1
}
if concurrency > 4 {
concurrency = 4
if concurrency > maxConcurrency {
concurrency = maxConcurrency
}
}
splits := cache.Split(concurrency)

View File

@ -137,10 +137,10 @@ type Engine struct {
stats *EngineStatistics
// Limiters for concurrent compactions. The low priority limiter is for level 3 and 4
// compactions. The high priority is for level 1 and 2 compactions.
loPriCompactionLimiter limiter.Fixed
hiPriCompactionLimiter limiter.Fixed
// Limiter for concurrent compactions.
compactionLimiter limiter.Fixed
scheduler *scheduler
}
// NewEngine returns a new instance of Engine.
@ -157,6 +157,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
}
logger := zap.New(zap.NullEncoder())
stats := &EngineStatistics{}
e := &Engine{
id: id,
database: database,
@ -178,9 +179,9 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
stats: &EngineStatistics{},
loPriCompactionLimiter: opt.LoPriCompactionLimiter,
hiPriCompactionLimiter: opt.HiPriCompactionLimiter,
stats: stats,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
}
// Attach fieldset to index.
@ -242,13 +243,10 @@ func (e *Engine) enableLevelCompactions(wait bool) {
quit := make(chan struct{})
e.done = quit
e.wg.Add(4)
e.wg.Add(1)
e.mu.Unlock()
go func() { defer e.wg.Done(); e.compactTSMFull(quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 1, quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 2, quit) }()
go func() { defer e.wg.Done(); e.compactTSMLevel(true, 3, quit) }()
go func() { defer e.wg.Done(); e.compact(quit) }()
}
// disableLevelCompactions will stop level compactions before returning.
@ -1249,7 +1247,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration
}
func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) {
func (e *Engine) compact(quit <-chan struct{}) {
t := time.NewTicker(time.Second)
defer t.Stop()
@ -1259,35 +1257,158 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) {
return
case <-t.C:
s := e.levelCompactionStrategy(fast, level)
if s != nil {
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release(s.compactionGroups)
// level 1 and 2 are higher priority and can take all the available capacity
// of the hi and lo limiter.
level1Groups := e.CompactionPlan.PlanLevel(1)
level2Groups := e.CompactionPlan.PlanLevel(2)
level3Groups := e.CompactionPlan.PlanLevel(3)
level4Groups := e.CompactionPlan.Plan(e.WAL.LastWriteTime())
if len(level4Groups) == 0 {
level4Groups = e.CompactionPlan.PlanOptimize()
}
run1 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
run2 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
run3 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
run4 := atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
e.traceLogger.Info(fmt.Sprintf("compact id=%d (%d/%d) (%d/%d) (%d/%d) (%d/%d)",
e.id,
run1, len(level1Groups),
run2, len(level2Groups),
run3, len(level3Groups),
run4, len(level4Groups)))
e.scheduler.setDepth(1, len(level1Groups))
e.scheduler.setDepth(2, len(level2Groups))
e.scheduler.setDepth(3, len(level3Groups))
e.scheduler.setDepth(4, len(level4Groups))
for level, runnable := e.scheduler.next(); runnable; level, runnable = e.scheduler.next() {
run1 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
run2 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
run3 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
run4 := atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
e.traceLogger.Info(fmt.Sprintf("compact run=%d id=%d (%d/%d) (%d/%d) (%d/%d) (%d/%d)",
level, e.id,
run1, len(level1Groups),
run2, len(level2Groups),
run3, len(level3Groups),
run4, len(level4Groups)))
switch level {
case 1:
level1Groups = e.compactHiPriorityLevel(level1Groups, 1)
e.scheduler.setDepth(1, len(level1Groups))
case 2:
level2Groups = e.compactHiPriorityLevel(level2Groups, 2)
e.scheduler.setDepth(2, len(level2Groups))
case 3:
level3Groups = e.compactLoPriorityLevel(level3Groups, 3)
e.scheduler.setDepth(3, len(level3Groups))
case 4:
level4Groups = e.compactFull(level4Groups)
e.scheduler.setDepth(4, len(level4Groups))
}
}
// Release all the plans we didn't start.
e.CompactionPlan.Release(level1Groups)
e.CompactionPlan.Release(level2Groups)
e.CompactionPlan.Release(level3Groups)
e.CompactionPlan.Release(level4Groups)
}
}
}
func (e *Engine) compactTSMFull(quit <-chan struct{}) {
t := time.NewTicker(time.Second)
defer t.Stop()
// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns
// the plans that were not able to be started.
func (e *Engine) compactHiPriorityLevel(groups []CompactionGroup, level int) []CompactionGroup {
// Grab the first group
grp := groups[:1]
for {
select {
case <-quit:
return
case <-t.C:
s := e.fullCompactionStrategy()
if s != nil {
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release(s.compactionGroups)
}
}
s := e.levelCompactionStrategy(grp, true, level)
if s == nil {
// break
return groups
}
// Try hi priority limiter, otherwise steal a little from the low priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1)
go func() {
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release(s.compactionGroups)
}()
// // Slice off the group we just ran, it will be released when the compaction
// goroutine exits.
groups = groups[1:]
}
// Return the unused plans
return groups
}
// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns
// the plans that were not able to be started
func (e *Engine) compactLoPriorityLevel(groups []CompactionGroup, level int) []CompactionGroup {
grp := groups[:1]
s := e.levelCompactionStrategy(grp, true, level)
if s == nil {
// break
return groups
}
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1)
go func() {
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release(s.compactionGroups)
}()
groups = groups[1:]
}
return groups
}
// compactFull kicks off full and optimize compactions using the lo priority policy. It returns
// the plans that were not able to be started.
func (e *Engine) compactFull(groups []CompactionGroup) []CompactionGroup {
grp := groups[:1]
s := e.fullCompactionStrategy(grp, false)
if s == nil {
//break
return groups
}
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMFullCompactionsActive, 1)
go func() {
defer atomic.AddInt64(&e.stats.TSMFullCompactionsActive, -1)
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release(s.compactionGroups)
}()
groups = groups[1:]
}
return groups
}
// onFileStoreReplace is callback handler invoked when the FileStore
@ -1358,11 +1479,9 @@ type compactionStrategy struct {
successStat *int64
errorStat *int64
logger zap.Logger
compactor *Compactor
fileStore *FileStore
loPriLimiter limiter.Fixed
hiPriLimiter limiter.Fixed
logger zap.Logger
compactor *Compactor
fileStore *FileStore
engine *Engine
}
@ -1386,33 +1505,6 @@ func (s *compactionStrategy) Apply() {
// compactGroup executes the compaction strategy against a single CompactionGroup.
func (s *compactionStrategy) compactGroup(groupNum int) {
// Level 1 and 2 are high priority and have a larger slice of the pool. If all
// the high priority capacity is used up, they can steal from the low priority
// pool as well if there is capacity. Otherwise, it wait on the high priority
// limiter until an running compaction completes. Level 3 and 4 are low priority
// as they are generally larger compactions and more expensive to run. They can
// steal a little from the high priority limiter if there is no high priority work.
switch s.level {
case 1, 2:
if s.hiPriLimiter.TryTake() {
defer s.hiPriLimiter.Release()
} else if s.loPriLimiter.TryTake() {
defer s.loPriLimiter.Release()
} else {
s.hiPriLimiter.Take()
defer s.hiPriLimiter.Release()
}
default:
if s.loPriLimiter.TryTake() {
defer s.loPriLimiter.Release()
} else if s.hiPriLimiter.Idle() && s.hiPriLimiter.TryTake() {
defer s.hiPriLimiter.Release()
} else {
s.loPriLimiter.Take()
defer s.loPriLimiter.Release()
}
}
group := s.compactionGroups[groupNum]
start := time.Now()
s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group)))
@ -1420,17 +1512,16 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
s.logger.Info(fmt.Sprintf("compacting %s group (%d) %s (#%d)", s.description, groupNum, f, i))
}
files, err := func() ([]string, error) {
// Count the compaction as active only while the compaction is actually running.
atomic.AddInt64(s.activeStat, 1)
defer atomic.AddInt64(s.activeStat, -1)
var (
err error
files []string
)
if s.fast {
return s.compactor.CompactFast(group)
} else {
return s.compactor.CompactFull(group)
}
}()
if s.fast {
files, err = s.compactor.CompactFast(group)
} else {
files, err = s.compactor.CompactFull(group)
}
if err != nil {
_, inProgress := err.(errCompactionInProgress)
@ -1465,9 +1556,7 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
// levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrategy {
compactionGroups := e.CompactionPlan.PlanLevel(level)
func (e *Engine) levelCompactionStrategy(compactionGroups []CompactionGroup, fast bool, level int) *compactionStrategy {
if len(compactionGroups) == 0 {
return nil
}
@ -1478,8 +1567,6 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate
fileStore: e.FileStore,
compactor: e.Compactor,
fast: fast,
loPriLimiter: e.loPriCompactionLimiter,
hiPriLimiter: e.hiPriCompactionLimiter,
engine: e,
level: level,
@ -1493,15 +1580,7 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate
// fullCompactionStrategy returns a compactionStrategy for higher level generations of TSM files.
// It returns nil if there are no TSM files to compact.
func (e *Engine) fullCompactionStrategy() *compactionStrategy {
optimize := false
compactionGroups := e.CompactionPlan.Plan(e.WAL.LastWriteTime())
if len(compactionGroups) == 0 {
optimize = true
compactionGroups = e.CompactionPlan.PlanOptimize()
}
func (e *Engine) fullCompactionStrategy(compactionGroups []CompactionGroup, optimize bool) *compactionStrategy {
if len(compactionGroups) == 0 {
return nil
}
@ -1512,8 +1591,6 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy {
fileStore: e.FileStore,
compactor: e.Compactor,
fast: optimize,
loPriLimiter: e.loPriCompactionLimiter,
hiPriLimiter: e.hiPriCompactionLimiter,
engine: e,
level: 4,
}

View File

@ -0,0 +1,79 @@
package tsm1
import (
"sync/atomic"
)
var defaultWeights = [4]float64{0.4, 0.3, 0.2, 0.1}
type scheduler struct {
maxConcurrency int
stats *EngineStatistics
// queues is the depth of work pending for each compaction level
queues [4]int
weights [4]float64
}
func newScheduler(stats *EngineStatistics, maxConcurrency int) *scheduler {
return &scheduler{
stats: stats,
maxConcurrency: maxConcurrency,
weights: defaultWeights,
}
}
func (s *scheduler) setDepth(level, depth int) {
level = level - 1
if level < 0 || level > len(s.queues) {
return
}
s.queues[level] = depth
}
func (s *scheduler) next() (int, bool) {
level1Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[0]))
level2Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[1]))
level3Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[2]))
level4Running := int(atomic.LoadInt64(&s.stats.TSMFullCompactionsActive) + atomic.LoadInt64(&s.stats.TSMOptimizeCompactionsActive))
if level1Running+level2Running+level3Running+level4Running >= s.maxConcurrency {
return 0, false
}
var (
level int
runnable bool
)
loLimit, _ := s.limits()
end := len(s.queues)
if level3Running+level4Running >= loLimit {
end = 2
}
var weight float64
for i := 0; i < end; i++ {
if float64(s.queues[i])*s.weights[i] > weight {
level, runnable = i+1, true
weight = float64(s.queues[i]) * s.weights[i]
}
}
return level, runnable
}
func (s *scheduler) limits() (int, int) {
hiLimit := s.maxConcurrency * 4 / 5
loLimit := (s.maxConcurrency / 5) + 1
if hiLimit == 0 {
hiLimit = 1
}
if loLimit == 0 {
loLimit = 1
}
return loLimit, hiLimit
}

View File

@ -0,0 +1,74 @@
package tsm1
import "testing"
func TestScheduler_Runnable_Empty(t *testing.T) {
s := newScheduler(&EngineStatistics{}, 1)
for i := 1; i < 5; i++ {
s.setDepth(i, 1)
level, runnable := s.next()
if exp, got := true, runnable; exp != got {
t.Fatalf("runnable(%d) mismatch: exp %v, got %v ", i, exp, got)
}
if exp, got := i, level; exp != got {
t.Fatalf("runnable(%d) mismatch: exp %v, got %v ", i, exp, got)
}
s.setDepth(i, 0)
}
}
func TestScheduler_Runnable_MaxConcurrency(t *testing.T) {
s := newScheduler(&EngineStatistics{}, 1)
// level 1
s.stats = &EngineStatistics{}
s.stats.TSMCompactionsActive[0] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got)
}
}
// level 2
s.stats = &EngineStatistics{}
s.stats.TSMCompactionsActive[1] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got)
}
}
// level 3
s.stats = &EngineStatistics{}
s.stats.TSMCompactionsActive[2] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got)
}
}
// optimize
s.stats = &EngineStatistics{}
s.stats.TSMOptimizeCompactionsActive++
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got)
}
}
// full
s.stats = &EngineStatistics{}
s.stats.TSMFullCompactionsActive++
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
t.Fatalf("runnable mismatch: exp %v, got %v ", exp, got)
}
}
}

View File

@ -174,19 +174,7 @@ func (s *Store) loadShards() error {
lim = runtime.GOMAXPROCS(0)
}
// If only one compacttion can run at time, use the same limiter for high and low
// priority work.
if lim == 1 {
s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(1)
s.EngineOptions.LoPriCompactionLimiter = s.EngineOptions.HiPriCompactionLimiter
} else {
// Split the available high and low priority limiters between the available cores.
// The high priority work can steal from low priority at times so it can use the
// full limit if there is pending work. The low priority is capped at half the
// limit.
s.EngineOptions.HiPriCompactionLimiter = limiter.NewFixed(lim/2 + lim%2)
s.EngineOptions.LoPriCompactionLimiter = limiter.NewFixed(lim / 2)
}
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
resC := make(chan *res)