Merge pull request #6977 from influxdata/js-5499-stats-and-diagnostics-to-tsm-engine

Add stats and diagnostics to the TSM engine
pull/6978/head
Jonathan A. Sternberg 2016-07-07 20:29:24 -05:00 committed by GitHub
commit 8610099ed7
3 changed files with 82 additions and 1 deletions

View File

@ -33,6 +33,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6938](https://github.com/influxdata/influxdb/issues/6938): Added favicon
- [#6507](https://github.com/influxdata/influxdb/issues/6507): Refactor monitor service to avoid expvar and write monitor statistics on a truncated time interval.
- [#6805](https://github.com/influxdata/influxdb/issues/6805): Allow any variant of the help option to trigger the help.
- [#5499](https://github.com/influxdata/influxdb/issues/5499): Add stats and diagnostics to the TSM engine.
### Bugfixes

View File

@ -12,6 +12,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb/influxql"
@ -36,6 +37,21 @@ const (
keyFieldSeparator = "#!~#"
)
// Statistics gathered by the engine.
const (
statCacheCompactions = "cacheCompactions"
statCacheCompactionError = "cacheCompactionErr"
statCacheCompactionDuration = "cacheCompactionDuration"
statTSMLevel1Compactions = "tsmLevel1Compactions"
statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration"
statTSMLevel2Compactions = "tsmLevel2Compactions"
statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration"
statTSMLevel3Compactions = "tsmLevel3Compactions"
statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration"
statTSMFullCompactions = "tsmFullCompactions"
statTSMFullCompactionDuration = "tsmFullCompactionDuration"
)
// Engine represents a storage engine with compressed blocks.
type Engine struct {
mu sync.RWMutex
@ -70,6 +86,8 @@ type Engine struct {
// Controls whether to enabled compactions when the engine is open
enableCompactionsOnOpen bool
stats *EngineStatistics
}
// NewEngine returns a new instance of Engine.
@ -105,6 +123,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
stats: &EngineStatistics{},
}
e.SetLogOutput(os.Stderr)
@ -198,9 +217,38 @@ func (e *Engine) Format() tsdb.EngineFormat {
return tsdb.TSM1Format
}
// EngineStatistics maintains statistics for the engine.
type EngineStatistics struct {
CacheCompactions int64
CacheCompactionErrors int64
CacheCompactionDuration int64
TSMCompactions [3]int64
TSMCompactionErrors [3]int64
TSMCompactionDuration [3]int64
TSMFullCompactions int64
TSMFullCompactionErrors int64
TSMFullCompactionDuration int64
}
// Statistics returns statistics for periodic monitoring.
func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, 0, 3)
statistics := make([]models.Statistic, 0, 4)
statistics = append(statistics, models.Statistic{
Name: "tsm1_engine",
Tags: tags,
Values: map[string]interface{}{
statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions),
statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration),
statTSMLevel1Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[0]),
statTSMLevel1CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[0]),
statTSMLevel2Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[1]),
statTSMLevel2CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[1]),
statTSMLevel3Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[2]),
statTSMLevel3CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[2]),
statTSMFullCompactions: atomic.LoadInt64(&e.stats.TSMFullCompactions),
statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration),
},
})
statistics = append(statistics, e.Cache.Statistics(tags)...)
statistics = append(statistics, e.FileStore.Statistics(tags)...)
statistics = append(statistics, e.WAL.Statistics(tags)...)
@ -716,10 +764,15 @@ func (e *Engine) compactCache() {
default:
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
start := time.Now()
err := e.WriteSnapshot()
if err != nil {
e.logger.Printf("error writing snapshot: %v", err)
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
atomic.AddInt64(&e.stats.CacheCompactions, 1)
}
atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())
}
}
time.Sleep(time.Second)
@ -755,6 +808,9 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
continue
}
// Keep track of the start time for statistics.
start := time.Now()
var wg sync.WaitGroup
for i, group := range tsmFiles {
wg.Add(1)
@ -773,6 +829,7 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
files, err = e.Compactor.CompactFast(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
@ -780,6 +837,7 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
files, err = e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
@ -787,6 +845,7 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
@ -794,11 +853,15 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
for i, f := range files {
e.logger.Printf("compacted level %d group (%d) into %s (#%d)", level, groupNum, f, i)
}
atomic.AddInt64(&e.stats.TSMCompactions[level-1], 1)
e.logger.Printf("compacted level %d group %d of %d files into %d files in %s",
level, groupNum, len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()
// Track the amount of time spent compacting the groups.
atomic.AddInt64(&e.stats.TSMCompactionDuration[level-1], time.Since(start).Nanoseconds())
}
}
}
@ -819,6 +882,9 @@ func (e *Engine) compactTSMFull() {
continue
}
// Keep track of the start time for statistics.
start := time.Now()
var wg sync.WaitGroup
for i, group := range tsmFiles {
wg.Add(1)
@ -833,12 +899,14 @@ func (e *Engine) compactTSMFull() {
files, err := e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)
time.Sleep(time.Second)
return
}
if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)
time.Sleep(time.Second)
return
}
@ -846,11 +914,15 @@ func (e *Engine) compactTSMFull() {
for i, f := range files {
e.logger.Printf("compacted full group (%d) into %s (#%d)", groupNum, f, i)
}
atomic.AddInt64(&e.stats.TSMFullCompactions, 1)
e.logger.Printf("compacted full %d files into %d files in %s",
len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()
// Track the amount of time spent compacting the groups.
atomic.AddInt64(&e.stats.TSMFullCompactionDuration, time.Since(start).Nanoseconds())
}
}
}

View File

@ -98,6 +98,7 @@ type TSMFile interface {
// Statistics gathered by the FileStore.
const (
statFileStoreBytes = "diskBytes"
statFileStoreCount = "numFiles"
)
type FileStore struct {
@ -156,6 +157,7 @@ func (f *FileStore) SetLogOutput(w io.Writer) {
// FileStoreStatistics keeps statistics about the file store.
type FileStoreStatistics struct {
DiskBytes int64
FileCount int64
}
// Statistics returns statistics for periodic monitoring.
@ -165,6 +167,7 @@ func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
Tags: tags,
Values: map[string]interface{}{
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount),
},
}}
}
@ -206,6 +209,7 @@ func (f *FileStore) Add(files ...TSMFile) {
}
f.files = append(f.files, files...)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
}
// Remove removes the files with matching paths from the set of active files. It does
@ -232,6 +236,7 @@ func (f *FileStore) Remove(paths ...string) {
}
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
}
// WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key
@ -384,6 +389,7 @@ func (f *FileStore) Open() error {
close(readerC)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
return nil
}
@ -396,6 +402,7 @@ func (f *FileStore) Close() error {
}
f.files = nil
atomic.StoreInt64(&f.stats.FileCount, 0)
return nil
}
@ -506,6 +513,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
// Recalculate the disk size stat
var totalSize int64