Add compaction backlog stat
This gives an indication as to whether compactions are backed up or not.pull/8886/head
parent
16ece490ef
commit
71071ed67a
|
@ -64,26 +64,31 @@ const (
|
|||
statTSMLevel1CompactionsActive = "tsmLevel1CompactionsActive"
|
||||
statTSMLevel1CompactionError = "tsmLevel1CompactionErr"
|
||||
statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration"
|
||||
statTSMLevel1CompactionQueue = "tsmLevel1CompactionQueue"
|
||||
|
||||
statTSMLevel2Compactions = "tsmLevel2Compactions"
|
||||
statTSMLevel2CompactionsActive = "tsmLevel2CompactionsActive"
|
||||
statTSMLevel2CompactionError = "tsmLevel2CompactionErr"
|
||||
statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration"
|
||||
statTSMLevel2CompactionQueue = "tsmLevel2CompactionQueue"
|
||||
|
||||
statTSMLevel3Compactions = "tsmLevel3Compactions"
|
||||
statTSMLevel3CompactionsActive = "tsmLevel3CompactionsActive"
|
||||
statTSMLevel3CompactionError = "tsmLevel3CompactionErr"
|
||||
statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration"
|
||||
statTSMLevel3CompactionQueue = "tsmLevel3CompactionQueue"
|
||||
|
||||
statTSMOptimizeCompactions = "tsmOptimizeCompactions"
|
||||
statTSMOptimizeCompactionsActive = "tsmOptimizeCompactionsActive"
|
||||
statTSMOptimizeCompactionError = "tsmOptimizeCompactionErr"
|
||||
statTSMOptimizeCompactionDuration = "tsmOptimizeCompactionDuration"
|
||||
statTSMOptimizeCompactionQueue = "tsmOptimizeCompactionQueue"
|
||||
|
||||
statTSMFullCompactions = "tsmFullCompactions"
|
||||
statTSMFullCompactionsActive = "tsmFullCompactionsActive"
|
||||
statTSMFullCompactionError = "tsmFullCompactionErr"
|
||||
statTSMFullCompactionDuration = "tsmFullCompactionDuration"
|
||||
statTSMFullCompactionQueue = "tsmFullCompactionQueue"
|
||||
)
|
||||
|
||||
// Engine represents a storage engine with compressed blocks.
|
||||
|
@ -406,16 +411,19 @@ type EngineStatistics struct {
|
|||
TSMCompactionsActive [3]int64 // Gauge of TSM compactions (by level) currently running.
|
||||
TSMCompactionErrors [3]int64 // Counter of TSM compcations (by level) that have failed due to error.
|
||||
TSMCompactionDuration [3]int64 // Counter of number of wall nanoseconds spent in TSM compactions (by level).
|
||||
TSMCompactionsQueue [3]int64 // Gauge of TSM compactions queues (by level).
|
||||
|
||||
TSMOptimizeCompactions int64 // Counter of optimize compactions that have ever run.
|
||||
TSMOptimizeCompactionsActive int64 // Gauge of optimize compactions currently running.
|
||||
TSMOptimizeCompactionErrors int64 // Counter of optimize compactions that have failed due to error.
|
||||
TSMOptimizeCompactionDuration int64 // Counter of number of wall nanoseconds spent in optimize compactions.
|
||||
TSMOptimizeCompactionsQueue int64 // Gauge of optimize compactions queue.
|
||||
|
||||
TSMFullCompactions int64 // Counter of full compactions that have ever run.
|
||||
TSMFullCompactionsActive int64 // Gauge of full compactions currently running.
|
||||
TSMFullCompactionErrors int64 // Counter of full compactions that have failed due to error.
|
||||
TSMFullCompactionDuration int64 // Counter of number of wall nanoseconds spent in full compactions.
|
||||
TSMFullCompactionsQueue int64 // Gauge of full compactions queue.
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
|
@ -434,26 +442,31 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
|
|||
statTSMLevel1CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]),
|
||||
statTSMLevel1CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[0]),
|
||||
statTSMLevel1CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[0]),
|
||||
statTSMLevel1CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[0]),
|
||||
|
||||
statTSMLevel2Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[1]),
|
||||
statTSMLevel2CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]),
|
||||
statTSMLevel2CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[1]),
|
||||
statTSMLevel2CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[1]),
|
||||
statTSMLevel2CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[1]),
|
||||
|
||||
statTSMLevel3Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[2]),
|
||||
statTSMLevel3CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]),
|
||||
statTSMLevel3CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[2]),
|
||||
statTSMLevel3CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[2]),
|
||||
statTSMLevel3CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[2]),
|
||||
|
||||
statTSMOptimizeCompactions: atomic.LoadInt64(&e.stats.TSMOptimizeCompactions),
|
||||
statTSMOptimizeCompactionsActive: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive),
|
||||
statTSMOptimizeCompactionError: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionErrors),
|
||||
statTSMOptimizeCompactionDuration: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionDuration),
|
||||
statTSMOptimizeCompactionQueue: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsQueue),
|
||||
|
||||
statTSMFullCompactions: atomic.LoadInt64(&e.stats.TSMFullCompactions),
|
||||
statTSMFullCompactionsActive: atomic.LoadInt64(&e.stats.TSMFullCompactionsActive),
|
||||
statTSMFullCompactionError: atomic.LoadInt64(&e.stats.TSMFullCompactionErrors),
|
||||
statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration),
|
||||
statTSMFullCompactionQueue: atomic.LoadInt64(&e.stats.TSMFullCompactionsQueue),
|
||||
},
|
||||
})
|
||||
|
||||
|
@ -1258,16 +1271,24 @@ func (e *Engine) compact(quit <-chan struct{}) {
|
|||
|
||||
case <-t.C:
|
||||
|
||||
// level 1 and 2 are higher priority and can take all the available capacity
|
||||
// of the hi and lo limiter.
|
||||
// Find our compaction plans
|
||||
level1Groups := e.CompactionPlan.PlanLevel(1)
|
||||
level2Groups := e.CompactionPlan.PlanLevel(2)
|
||||
level3Groups := e.CompactionPlan.PlanLevel(3)
|
||||
level4Groups := e.CompactionPlan.Plan(e.WAL.LastWriteTime())
|
||||
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))
|
||||
|
||||
// If no full compactions are need, see if an optimize is needed
|
||||
if len(level4Groups) == 0 {
|
||||
level4Groups = e.CompactionPlan.PlanOptimize()
|
||||
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))
|
||||
}
|
||||
|
||||
// Update the level plan queue stats
|
||||
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], int64(len(level1Groups)))
|
||||
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], int64(len(level2Groups)))
|
||||
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], int64(len(level3Groups)))
|
||||
|
||||
run1 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
|
||||
run2 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
|
||||
run3 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
|
||||
|
@ -1280,11 +1301,13 @@ func (e *Engine) compact(quit <-chan struct{}) {
|
|||
run3, len(level3Groups),
|
||||
run4, len(level4Groups)))
|
||||
|
||||
// Set the queue depths on the scheduler
|
||||
e.scheduler.setDepth(1, len(level1Groups))
|
||||
e.scheduler.setDepth(2, len(level2Groups))
|
||||
e.scheduler.setDepth(3, len(level3Groups))
|
||||
e.scheduler.setDepth(4, len(level4Groups))
|
||||
|
||||
// Find the next compaction that can run and try to kick it off
|
||||
for level, runnable := e.scheduler.next(); runnable; level, runnable = e.scheduler.next() {
|
||||
run1 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
|
||||
run2 := atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
|
||||
|
@ -1300,18 +1323,25 @@ func (e *Engine) compact(quit <-chan struct{}) {
|
|||
|
||||
switch level {
|
||||
case 1:
|
||||
level1Groups = e.compactHiPriorityLevel(level1Groups, 1)
|
||||
e.scheduler.setDepth(1, len(level1Groups))
|
||||
if e.compactHiPriorityLevel(level1Groups[0], 1) {
|
||||
level1Groups = level1Groups[1:]
|
||||
e.scheduler.setDepth(1, len(level1Groups))
|
||||
}
|
||||
case 2:
|
||||
level2Groups = e.compactHiPriorityLevel(level2Groups, 2)
|
||||
e.scheduler.setDepth(2, len(level2Groups))
|
||||
if e.compactHiPriorityLevel(level2Groups[0], 2) {
|
||||
level2Groups = level2Groups[1:]
|
||||
e.scheduler.setDepth(2, len(level2Groups))
|
||||
}
|
||||
case 3:
|
||||
level3Groups = e.compactLoPriorityLevel(level3Groups, 3)
|
||||
e.scheduler.setDepth(3, len(level3Groups))
|
||||
if e.compactLoPriorityLevel(level3Groups[0], 3) {
|
||||
level3Groups = level3Groups[1:]
|
||||
e.scheduler.setDepth(3, len(level3Groups))
|
||||
}
|
||||
case 4:
|
||||
level4Groups = e.compactFull(level4Groups)
|
||||
e.scheduler.setDepth(4, len(level4Groups))
|
||||
|
||||
if e.compactFull(level4Groups[0]) {
|
||||
level4Groups = level4Groups[1:]
|
||||
e.scheduler.setDepth(4, len(level4Groups))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1325,15 +1355,11 @@ func (e *Engine) compact(quit <-chan struct{}) {
|
|||
}
|
||||
|
||||
// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns
|
||||
// the plans that were not able to be started.
|
||||
func (e *Engine) compactHiPriorityLevel(groups []CompactionGroup, level int) []CompactionGroup {
|
||||
// Grab the first group
|
||||
grp := groups[:1]
|
||||
|
||||
// true if the compaction was started
|
||||
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool {
|
||||
s := e.levelCompactionStrategy(grp, true, level)
|
||||
if s == nil {
|
||||
// break
|
||||
return groups
|
||||
return false
|
||||
}
|
||||
|
||||
// Try hi priority limiter, otherwise steal a little from the low priority if we can.
|
||||
|
@ -1346,26 +1372,21 @@ func (e *Engine) compactHiPriorityLevel(groups []CompactionGroup, level int) []C
|
|||
defer e.compactionLimiter.Release()
|
||||
s.Apply()
|
||||
// Release the files in the compaction plan
|
||||
e.CompactionPlan.Release(s.compactionGroups)
|
||||
e.CompactionPlan.Release([]CompactionGroup{s.group})
|
||||
}()
|
||||
// // Slice off the group we just ran, it will be released when the compaction
|
||||
// goroutine exits.
|
||||
groups = groups[1:]
|
||||
return true
|
||||
}
|
||||
|
||||
// Return the unused plans
|
||||
return groups
|
||||
return false
|
||||
}
|
||||
|
||||
// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns
|
||||
// the plans that were not able to be started
|
||||
func (e *Engine) compactLoPriorityLevel(groups []CompactionGroup, level int) []CompactionGroup {
|
||||
grp := groups[:1]
|
||||
|
||||
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int) bool {
|
||||
s := e.levelCompactionStrategy(grp, true, level)
|
||||
if s == nil {
|
||||
// break
|
||||
return groups
|
||||
return false
|
||||
}
|
||||
|
||||
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
|
||||
|
@ -1377,22 +1398,19 @@ func (e *Engine) compactLoPriorityLevel(groups []CompactionGroup, level int) []C
|
|||
defer e.compactionLimiter.Release()
|
||||
s.Apply()
|
||||
// Release the files in the compaction plan
|
||||
e.CompactionPlan.Release(s.compactionGroups)
|
||||
e.CompactionPlan.Release([]CompactionGroup{s.group})
|
||||
}()
|
||||
groups = groups[1:]
|
||||
return true
|
||||
}
|
||||
return groups
|
||||
return false
|
||||
}
|
||||
|
||||
// compactFull kicks off full and optimize compactions using the lo priority policy. It returns
|
||||
// the plans that were not able to be started.
|
||||
func (e *Engine) compactFull(groups []CompactionGroup) []CompactionGroup {
|
||||
grp := groups[:1]
|
||||
|
||||
func (e *Engine) compactFull(grp CompactionGroup) bool {
|
||||
s := e.fullCompactionStrategy(grp, false)
|
||||
if s == nil {
|
||||
//break
|
||||
return groups
|
||||
return false
|
||||
}
|
||||
|
||||
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
|
||||
|
@ -1404,11 +1422,11 @@ func (e *Engine) compactFull(groups []CompactionGroup) []CompactionGroup {
|
|||
defer e.compactionLimiter.Release()
|
||||
s.Apply()
|
||||
// Release the files in the compaction plan
|
||||
e.CompactionPlan.Release(s.compactionGroups)
|
||||
e.CompactionPlan.Release([]CompactionGroup{s.group})
|
||||
}()
|
||||
groups = groups[1:]
|
||||
return true
|
||||
}
|
||||
return groups
|
||||
return false
|
||||
}
|
||||
|
||||
// onFileStoreReplace is callback handler invoked when the FileStore
|
||||
|
@ -1468,7 +1486,7 @@ func (e *Engine) onFileStoreReplace(newFiles []TSMFile) {
|
|||
|
||||
// compactionStrategy holds the details of what to do in a compaction.
|
||||
type compactionStrategy struct {
|
||||
compactionGroups []CompactionGroup
|
||||
group CompactionGroup
|
||||
|
||||
fast bool
|
||||
description string
|
||||
|
@ -1491,25 +1509,23 @@ func (s *compactionStrategy) Apply() {
|
|||
start := time.Now()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := range s.compactionGroups {
|
||||
wg.Add(1)
|
||||
go func(groupNum int) {
|
||||
defer wg.Done()
|
||||
s.compactGroup(groupNum)
|
||||
}(i)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.compactGroup()
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
atomic.AddInt64(s.durationStat, time.Since(start).Nanoseconds())
|
||||
}
|
||||
|
||||
// compactGroup executes the compaction strategy against a single CompactionGroup.
|
||||
func (s *compactionStrategy) compactGroup(groupNum int) {
|
||||
group := s.compactionGroups[groupNum]
|
||||
func (s *compactionStrategy) compactGroup() {
|
||||
group := s.group
|
||||
start := time.Now()
|
||||
s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group)))
|
||||
s.logger.Info(fmt.Sprintf("beginning %s compaction, %d TSM files", s.description, len(group)))
|
||||
for i, f := range group {
|
||||
s.logger.Info(fmt.Sprintf("compacting %s group (%d) %s (#%d)", s.description, groupNum, f, i))
|
||||
s.logger.Info(fmt.Sprintf("compacting %s %s (#%d)", s.description, f, i))
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -1526,7 +1542,7 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
|
|||
if err != nil {
|
||||
_, inProgress := err.(errCompactionInProgress)
|
||||
if err == errCompactionsDisabled || inProgress {
|
||||
s.logger.Info(fmt.Sprintf("aborted %s compaction group (%d). %v", s.description, groupNum, err))
|
||||
s.logger.Info(fmt.Sprintf("aborted %s compaction. %v", s.description, err))
|
||||
|
||||
if _, ok := err.(errCompactionInProgress); ok {
|
||||
time.Sleep(time.Second)
|
||||
|
@ -1548,7 +1564,7 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
|
|||
}
|
||||
|
||||
for i, f := range files {
|
||||
s.logger.Info(fmt.Sprintf("compacted %s group (%d) into %s (#%d)", s.description, groupNum, f, i))
|
||||
s.logger.Info(fmt.Sprintf("compacted %s into %s (#%d)", s.description, f, i))
|
||||
}
|
||||
s.logger.Info(fmt.Sprintf("compacted %s %d files into %d files in %s", s.description, len(group), len(files), time.Since(start)))
|
||||
atomic.AddInt64(s.successStat, 1)
|
||||
|
@ -1556,19 +1572,15 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
|
|||
|
||||
// levelCompactionStrategy returns a compactionStrategy for the given level.
|
||||
// It returns nil if there are no TSM files to compact.
|
||||
func (e *Engine) levelCompactionStrategy(compactionGroups []CompactionGroup, fast bool, level int) *compactionStrategy {
|
||||
if len(compactionGroups) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {
|
||||
return &compactionStrategy{
|
||||
compactionGroups: compactionGroups,
|
||||
logger: e.logger,
|
||||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: fast,
|
||||
engine: e,
|
||||
level: level,
|
||||
group: group,
|
||||
logger: e.logger,
|
||||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: fast,
|
||||
engine: e,
|
||||
level: level,
|
||||
|
||||
description: fmt.Sprintf("level %d", level),
|
||||
activeStat: &e.stats.TSMCompactionsActive[level-1],
|
||||
|
@ -1580,19 +1592,15 @@ func (e *Engine) levelCompactionStrategy(compactionGroups []CompactionGroup, fas
|
|||
|
||||
// fullCompactionStrategy returns a compactionStrategy for higher level generations of TSM files.
|
||||
// It returns nil if there are no TSM files to compact.
|
||||
func (e *Engine) fullCompactionStrategy(compactionGroups []CompactionGroup, optimize bool) *compactionStrategy {
|
||||
if len(compactionGroups) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *compactionStrategy {
|
||||
s := &compactionStrategy{
|
||||
compactionGroups: compactionGroups,
|
||||
logger: e.logger,
|
||||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: optimize,
|
||||
engine: e,
|
||||
level: 4,
|
||||
group: group,
|
||||
logger: e.logger,
|
||||
fileStore: e.FileStore,
|
||||
compactor: e.Compactor,
|
||||
fast: optimize,
|
||||
engine: e,
|
||||
level: 4,
|
||||
}
|
||||
|
||||
if optimize {
|
||||
|
|
|
@ -1292,7 +1292,7 @@ func (s *Store) monitorShards() {
|
|||
for _, sh := range s.shards {
|
||||
if sh.IsIdle() {
|
||||
if err := sh.Free(); err != nil {
|
||||
s.Logger.Warn("error free cold shard resources: %v", zap.Error(err))
|
||||
s.Logger.Warn("error free cold shard resources:", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
sh.SetCompactionsEnabled(true)
|
||||
|
|
Loading…
Reference in New Issue