Merge pull request #7281 from influxdata/mr-compaction-stats

Add stats for active compactions
pull/7485/head
Mark Rushakoff 2016-10-18 14:30:16 -07:00 committed by GitHub
commit 987aaa7267
2 changed files with 225 additions and 194 deletions

View File

@ -29,6 +29,7 @@
- [#6896](https://github.com/influxdata/influxdb/issues/6896): Correctly read in input from a non-interactive stream for the CLI.
- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent.
- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time.
- [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors.
### Bugfixes

View File

@ -42,17 +42,33 @@ const (
// Statistics gathered by the engine.
const (
statCacheCompactions = "cacheCompactions"
statCacheCompactionsActive = "cacheCompactionsActive"
statCacheCompactionError = "cacheCompactionErr"
statCacheCompactionDuration = "cacheCompactionDuration"
statTSMLevel1Compactions = "tsmLevel1Compactions"
statTSMLevel1CompactionsActive = "tsmLevel1CompactionsActive"
statTSMLevel1CompactionError = "tsmLevel1CompactionErr"
statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration"
statTSMLevel2Compactions = "tsmLevel2Compactions"
statTSMLevel2CompactionsActive = "tsmLevel2CompactionsActive"
statTSMLevel2CompactionError = "tsmLevel2CompactionErr"
statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration"
statTSMLevel3Compactions = "tsmLevel3Compactions"
statTSMLevel3CompactionsActive = "tsmLevel3CompactionsActive"
statTSMLevel3CompactionError = "tsmLevel3CompactionErr"
statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration"
statTSMOptimizeCompactions = "tsmOptimizeCompactions"
statTSMOptimizeCompactionsActive = "tsmOptimizeCompactionsActive"
statTSMOptimizeCompactionError = "tsmOptimizeCompactionErr"
statTSMOptimizeCompactionDuration = "tsmOptimizeCompactionDuration"
statTSMFullCompactions = "tsmFullCompactions"
statTSMFullCompactionsActive = "tsmFullCompactionsActive"
statTSMFullCompactionError = "tsmFullCompactionErr"
statTSMFullCompactionDuration = "tsmFullCompactionDuration"
)
@ -296,18 +312,25 @@ func (e *Engine) Format() tsdb.EngineFormat {
// EngineStatistics maintains statistics for the engine.
type EngineStatistics struct {
CacheCompactions int64
CacheCompactionErrors int64
CacheCompactionDuration int64
TSMCompactions [3]int64
TSMCompactionErrors [3]int64
TSMCompactionDuration [3]int64
TSMOptimizeCompactions int64
TSMOptimizeCompactionErrors int64
TSMOptimizeCompactionDuration int64
TSMFullCompactions int64
TSMFullCompactionErrors int64
TSMFullCompactionDuration int64
CacheCompactions int64 // Counter of cache compactions that have ever run.
CacheCompactionsActive int64 // Gauge of cache compactions currently running.
CacheCompactionErrors int64 // Counter of cache compactions that have failed due to error.
CacheCompactionDuration int64 // Counter of number of wall nanoseconds spent in cache compactions.
TSMCompactions [3]int64 // Counter of TSM compactions (by level) that have ever run.
TSMCompactionsActive [3]int64 // Gauge of TSM compactions (by level) currently running.
TSMCompactionErrors [3]int64 // Counter of TSM compcations (by level) that have failed due to error.
TSMCompactionDuration [3]int64 // Counter of number of wall nanoseconds spent in TSM compactions (by level).
TSMOptimizeCompactions int64 // Counter of optimize compactions that have ever run.
TSMOptimizeCompactionsActive int64 // Gauge of optimize compactions currently running.
TSMOptimizeCompactionErrors int64 // Counter of optimize compactions that have failed due to error.
TSMOptimizeCompactionDuration int64 // Counter of number of wall nanoseconds spent in optimize compactions.
TSMFullCompactions int64 // Counter of full compactions that have ever run.
TSMFullCompactionsActive int64 // Gauge of full compactions currently running.
TSMFullCompactionErrors int64 // Counter of full compactions that have failed due to error.
TSMFullCompactionDuration int64 // Counter of number of wall nanoseconds spent in full compactions.
}
// Statistics returns statistics for periodic monitoring.
@ -318,14 +341,33 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
Tags: tags,
Values: map[string]interface{}{
statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions),
statCacheCompactionsActive: atomic.LoadInt64(&e.stats.CacheCompactionsActive),
statCacheCompactionError: atomic.LoadInt64(&e.stats.CacheCompactionErrors),
statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration),
statTSMLevel1Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[0]),
statTSMLevel1CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]),
statTSMLevel1CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[0]),
statTSMLevel1CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[0]),
statTSMLevel2Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[1]),
statTSMLevel2CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]),
statTSMLevel2CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[1]),
statTSMLevel2CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[1]),
statTSMLevel3Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[2]),
statTSMLevel3CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]),
statTSMLevel3CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[2]),
statTSMLevel3CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[2]),
statTSMOptimizeCompactions: atomic.LoadInt64(&e.stats.TSMOptimizeCompactions),
statTSMOptimizeCompactionsActive: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive),
statTSMOptimizeCompactionError: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionErrors),
statTSMOptimizeCompactionDuration: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionDuration),
statTSMFullCompactions: atomic.LoadInt64(&e.stats.TSMFullCompactions),
statTSMFullCompactionsActive: atomic.LoadInt64(&e.stats.TSMFullCompactionsActive),
statTSMFullCompactionError: atomic.LoadInt64(&e.stats.TSMFullCompactionErrors),
statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration),
},
})
@ -931,83 +973,13 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) {
return
default:
tsmFiles := e.CompactionPlan.PlanLevel(level)
if len(tsmFiles) == 0 {
s := e.levelCompactionStrategy(fast, level)
if s == nil {
time.Sleep(time.Second)
continue
}
// Keep track of the start time for statistics.
start := time.Now()
var wg sync.WaitGroup
for i, group := range tsmFiles {
wg.Add(1)
go func(groupNum int, group CompactionGroup) {
defer wg.Done()
start := time.Now()
e.logger.Printf("beginning level %d compaction of group %d, %d TSM files", level, groupNum, len(group))
for i, f := range group {
e.logger.Printf("compacting level %d group (%d) %s (#%d)", level, groupNum, f, i)
}
var files []string
var err error
if fast {
files, err = e.Compactor.CompactFast(group)
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted level %d group (%d). %v",
level, groupNum, err)
if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
} else {
files, err = e.Compactor.CompactFull(group)
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted level %d compaction group (%d). %v",
level, groupNum, err)
if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
}
if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
for i, f := range files {
e.logger.Printf("compacted level %d group (%d) into %s (#%d)", level, groupNum, f, i)
}
atomic.AddInt64(&e.stats.TSMCompactions[level-1], 1)
e.logger.Printf("compacted level %d group %d of %d files into %d files in %s",
level, groupNum, len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()
// Track the amount of time spent compacting the groups.
atomic.AddInt64(&e.stats.TSMCompactionDuration[level-1], time.Since(start).Nanoseconds())
s.Apply()
}
}
}
@ -1019,106 +991,164 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) {
return
default:
optimize := false
logDesc := "full"
tsmFiles := e.CompactionPlan.Plan(e.WAL.LastWriteTime())
if len(tsmFiles) == 0 {
optimize = true
logDesc = "optimize"
tsmFiles = e.CompactionPlan.PlanOptimize()
}
if len(tsmFiles) == 0 {
s := e.fullCompactionStrategy()
if s == nil {
time.Sleep(time.Second)
continue
}
// Keep track of the start time for statistics.
s.Apply()
}
}
}
// compactionStrategy holds the details of what to do in a compaction.
type compactionStrategy struct {
compactionGroups []CompactionGroup
fast bool
description string
durationStat *int64
activeStat *int64
successStat *int64
errorStat *int64
logger *log.Logger
compactor *Compactor
fileStore *FileStore
}
// Apply concurrently compacts all the groups in a compaction strategy.
func (s *compactionStrategy) Apply() {
start := time.Now()
var wg sync.WaitGroup
for i, group := range tsmFiles {
for i := range s.compactionGroups {
wg.Add(1)
go func(groupNum int, group CompactionGroup) {
go func(groupNum int) {
defer wg.Done()
s.compactGroup(groupNum)
}(i)
}
wg.Wait()
atomic.AddInt64(s.durationStat, time.Since(start).Nanoseconds())
}
// compactGroup executes the compaction strategy against a single CompactionGroup.
func (s *compactionStrategy) compactGroup(groupNum int) {
group := s.compactionGroups[groupNum]
start := time.Now()
e.logger.Printf("beginning %s compaction of group %d, %d TSM files", logDesc, groupNum, len(group))
s.logger.Printf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group))
for i, f := range group {
e.logger.Printf("compacting %s group (%d) %s (#%d)", logDesc, groupNum, f, i)
s.logger.Printf("compacting %s group (%d) %s (#%d)", s.description, groupNum, f, i)
}
var (
files []string
err error
)
if optimize {
files, err = e.Compactor.CompactFast(group)
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted %s compaction group (%d). %v",
logDesc, groupNum, err)
files, err := func() ([]string, error) {
// Count the compaction as active only while the compaction is actually running.
atomic.AddInt64(s.activeStat, 1)
defer atomic.AddInt64(s.activeStat, -1)
if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMOptimizeCompactionErrors, 1)
time.Sleep(time.Second)
return
}
if s.fast {
return s.compactor.CompactFast(group)
} else {
files, err = e.Compactor.CompactFull(group)
return s.compactor.CompactFull(group)
}
}()
if err != nil {
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted %s compaction group (%d). %v",
logDesc, groupNum, err)
s.logger.Printf("aborted %s compaction group (%d). %v", s.description, groupNum, err)
if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)
}
s.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
}
if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)
if err := s.fileStore.Replace(group, files); err != nil {
s.logger.Printf("error replacing new TSM files: %v", err)
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
for i, f := range files {
e.logger.Printf("compacted %s group (%d) into %s (#%d)", logDesc, groupNum, f, i)
s.logger.Printf("compacted %s group (%d) into %s (#%d)", s.description, groupNum, f, i)
}
s.logger.Printf("compacted %s %d files into %d files in %s", s.description, len(group), len(files), time.Since(start))
atomic.AddInt64(s.successStat, 1)
}
// levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrategy {
compactionGroups := e.CompactionPlan.PlanLevel(level)
if len(compactionGroups) == 0 {
return nil
}
return &compactionStrategy{
compactionGroups: compactionGroups,
logger: e.logger,
fileStore: e.FileStore,
compactor: e.Compactor,
fast: fast,
description: fmt.Sprintf("level %d", level),
activeStat: &e.stats.TSMCompactionsActive[level-1],
successStat: &e.stats.TSMCompactions[level-1],
errorStat: &e.stats.TSMCompactionErrors[level-1],
durationStat: &e.stats.TSMCompactionDuration[level-1],
}
}
// fullCompactionStrategy returns a compactionStrategy for higher level generations of TSM files.
// It returns nil if there are no TSM files to compact.
func (e *Engine) fullCompactionStrategy() *compactionStrategy {
optimize := false
compactionGroups := e.CompactionPlan.Plan(e.WAL.LastWriteTime())
if len(compactionGroups) == 0 {
optimize = true
compactionGroups = e.CompactionPlan.PlanOptimize()
}
if len(compactionGroups) == 0 {
return nil
}
s := &compactionStrategy{
compactionGroups: compactionGroups,
logger: e.logger,
fileStore: e.FileStore,
compactor: e.Compactor,
fast: optimize,
}
if optimize {
atomic.AddInt64(&e.stats.TSMOptimizeCompactions, 1)
s.description = "optimize"
s.activeStat = &e.stats.TSMOptimizeCompactionsActive
s.successStat = &e.stats.TSMOptimizeCompactions
s.errorStat = &e.stats.TSMOptimizeCompactionErrors
s.durationStat = &e.stats.TSMOptimizeCompactionDuration
} else {
atomic.AddInt64(&e.stats.TSMFullCompactions, 1)
}
e.logger.Printf("compacted %s %d files into %d files in %s",
logDesc, len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()
// Track the amount of time spent compacting the groups.
if optimize {
atomic.AddInt64(&e.stats.TSMOptimizeCompactionDuration, time.Since(start).Nanoseconds())
} else {
atomic.AddInt64(&e.stats.TSMFullCompactionDuration, time.Since(start).Nanoseconds())
s.description = "full"
s.activeStat = &e.stats.TSMFullCompactionsActive
s.successStat = &e.stats.TSMFullCompactions
s.errorStat = &e.stats.TSMFullCompactionErrors
s.durationStat = &e.stats.TSMFullCompactionDuration
}
}
}
return s
}
// reloadCache reads the WAL segment files and loads them into the cache.