diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index b4b9f768cf..43f1691d29 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -271,6 +271,8 @@ func (m *Main) run(ctx context.Context) (err error) { m.logger.Error("failed to open engine", zap.Error(err)) return err } + // The Engine's metrics must be registered after it opens. + reg.MustRegister(engine.PrometheusCollectors()...) pointsWriter = m.engine diff --git a/storage/engine.go b/storage/engine.go index 379af0ba81..3722a8c894 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -169,6 +169,7 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector { // TODO(edd): Get prom metrics for TSM. // TODO(edd): Get prom metrics for index. // TODO(edd): Get prom metrics for series file. + metrics = append(metrics, e.engine.PrometheusCollectors()...) metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...) return metrics } diff --git a/tsdb/engine.go b/tsdb/engine.go new file mode 100644 index 0000000000..c5e8aeccff --- /dev/null +++ b/tsdb/engine.go @@ -0,0 +1,126 @@ +package tsdb + +import ( + "context" + "errors" + "regexp" + "runtime" + "time" + + "github.com/influxdata/influxql" + "github.com/influxdata/platform/models" + "github.com/influxdata/platform/pkg/limiter" + "go.uber.org/zap" +) + +var ( + // ErrUnknownEngineFormat is returned when the engine format is + // unknown. ErrUnknownEngineFormat is currently returned if a format + // other than tsm1 is encountered. + ErrUnknownEngineFormat = errors.New("unknown engine format") +) + +// Engine represents a swappable storage engine for the shard. +type Engine interface { + Open() error + Close() error + SetEnabled(enabled bool) + SetCompactionsEnabled(enabled bool) + ScheduleFullCompaction() error + + WithLogger(*zap.Logger) + + CreateCursorIterator(ctx context.Context) (CursorIterator, error) + WritePoints(points []models.Point) error + + CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error + CreateSeriesListIfNotExists(collection *SeriesCollection) error + DeleteSeriesRange(itr SeriesIterator, min, max int64) error + DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error + + SeriesN() int64 + + MeasurementExists(name []byte) (bool, error) + + MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) + ForEachMeasurementName(fn func(name []byte) error) error + DeleteMeasurement(name []byte) error + + HasTagKey(name, key []byte) (bool, error) + MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) + TagKeyCardinality(name, key []byte) int + + LastModified() time.Time + DiskSize() int64 + IsIdle() bool + Free() error +} + +// SeriesIDSets provides access to the total set of series IDs +type SeriesIDSets interface { + ForEach(f func(ids *SeriesIDSet)) error +} + +// EngineOptions represents the options used to initialize the engine. +type EngineOptions struct { + EngineVersion string + ShardID uint64 + + // Limits the concurrent number of TSM files that can be loaded at once. + OpenLimiter limiter.Fixed + + // CompactionDisabled specifies shards should not schedule compactions. + // This option is intended for offline tooling. + CompactionDisabled bool + CompactionPlannerCreator CompactionPlannerCreator + CompactionLimiter limiter.Fixed + CompactionThroughputLimiter limiter.Rate + WALEnabled bool + MonitorDisabled bool + + // DatabaseFilter is a predicate controlling which databases may be opened. + // If no function is set, all databases will be opened. + DatabaseFilter func(database string) bool + + // RetentionPolicyFilter is a predicate controlling which combination of database and retention policy may be opened. + // nil will allow all combinations to pass. + RetentionPolicyFilter func(database, rp string) bool + + // ShardFilter is a predicate controlling which combination of database, retention policy and shard group may be opened. + // nil will allow all combinations to pass. + ShardFilter func(database, rp string, id uint64) bool + + Config Config + SeriesIDSets SeriesIDSets + + OnNewEngine func(Engine) + + FileStoreObserver FileStoreObserver +} + +// NewEngineOptions constructs an EngineOptions object with safe default values. +// This should only be used in tests; production environments should read from a config file. +func NewEngineOptions() EngineOptions { + return EngineOptions{ + EngineVersion: DefaultEngine, + Config: NewConfig(), + OpenLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)), + CompactionLimiter: limiter.NewFixed(1), + WALEnabled: false, + } +} + +// NewInmemIndex returns a new "inmem" index type. +var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error) + +type CompactionPlannerCreator func(cfg Config) interface{} + +// FileStoreObserver is passed notifications before the file store adds or deletes files. In this way, it can +// be sure to observe every file that is added or removed even in the presence of process death. +type FileStoreObserver interface { + // FileFinishing is called before a file is renamed to it's final name. + FileFinishing(path string) error + + // FileUnlinking is called before a file is unlinked. + FileUnlinking(path string) error +} diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 87e1280de1..69df60743a 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "io" "io/ioutil" "math" "os" @@ -26,6 +27,7 @@ import ( "github.com/influxdata/platform/query" "github.com/influxdata/platform/tsdb" "github.com/influxdata/platform/tsdb/tsi1" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -35,7 +37,8 @@ import ( //go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl //go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl -var ( // Static objects to prevent small allocs. +var ( + // Static objects to prevent small allocs. keyFieldSeparatorBytes = []byte(keyFieldSeparator) emptyBytes = []byte{} ) @@ -70,44 +73,6 @@ const ( MaxPointsPerBlock = 1000 ) -// Statistics gathered by the engine. -const ( - statCacheCompactions = "cacheCompactions" - statCacheCompactionsActive = "cacheCompactionsActive" - statCacheCompactionError = "cacheCompactionErr" - statCacheCompactionDuration = "cacheCompactionDuration" - - statTSMLevel1Compactions = "tsmLevel1Compactions" - statTSMLevel1CompactionsActive = "tsmLevel1CompactionsActive" - statTSMLevel1CompactionError = "tsmLevel1CompactionErr" - statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration" - statTSMLevel1CompactionQueue = "tsmLevel1CompactionQueue" - - statTSMLevel2Compactions = "tsmLevel2Compactions" - statTSMLevel2CompactionsActive = "tsmLevel2CompactionsActive" - statTSMLevel2CompactionError = "tsmLevel2CompactionErr" - statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration" - statTSMLevel2CompactionQueue = "tsmLevel2CompactionQueue" - - statTSMLevel3Compactions = "tsmLevel3Compactions" - statTSMLevel3CompactionsActive = "tsmLevel3CompactionsActive" - statTSMLevel3CompactionError = "tsmLevel3CompactionErr" - statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration" - statTSMLevel3CompactionQueue = "tsmLevel3CompactionQueue" - - statTSMOptimizeCompactions = "tsmOptimizeCompactions" - statTSMOptimizeCompactionsActive = "tsmOptimizeCompactionsActive" - statTSMOptimizeCompactionError = "tsmOptimizeCompactionErr" - statTSMOptimizeCompactionDuration = "tsmOptimizeCompactionDuration" - statTSMOptimizeCompactionQueue = "tsmOptimizeCompactionQueue" - - statTSMFullCompactions = "tsmFullCompactions" - statTSMFullCompactionsActive = "tsmFullCompactionsActive" - statTSMFullCompactionError = "tsmFullCompactionErr" - statTSMFullCompactionDuration = "tsmFullCompactionDuration" - statTSMFullCompactionQueue = "tsmFullCompactionQueue" -) - // An EngineOption is a functional option for changing the configuration of // an Engine. type EngineOption func(i *Engine) @@ -190,7 +155,9 @@ type Engine struct { // Controls whether to enabled compactions when the engine is open enableCompactionsOnOpen bool - stats *EngineStatistics + compactionTracker *compactionTracker // Used to track state of compactions. + blockMetrics *blockMetrics // Provides Engine metrics to external systems. + defaultMetricLabels prometheus.Labels // N.B this must not be mutated after Open is called. // Limiter for concurrent compactions. compactionLimiter limiter.Fixed @@ -234,7 +201,6 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt } logger := zap.NewNop() - stats := &EngineStatistics{} e := &Engine{ path: path, index: idx, @@ -254,7 +220,6 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt CacheFlushWriteColdDuration: time.Duration(config.Cache.SnapshotWriteColdDuration), enableCompactionsOnOpen: true, formatFileName: DefaultFormatFileName, - stats: stats, compactionLimiter: limiter.NewFixed(maxCompactions), scheduler: newScheduler(stats, maxCompactions), } @@ -522,81 +487,6 @@ func (e *Engine) MeasurementStats() (MeasurementStats, error) { return e.FileStore.MeasurementStats() } -// EngineStatistics maintains statistics for the engine. -type EngineStatistics struct { - CacheCompactions int64 // Counter of cache compactions that have ever run. - CacheCompactionsActive int64 // Gauge of cache compactions currently running. - CacheCompactionErrors int64 // Counter of cache compactions that have failed due to error. - CacheCompactionDuration int64 // Counter of number of wall nanoseconds spent in cache compactions. - - TSMCompactions [3]int64 // Counter of TSM compactions (by level) that have ever run. - TSMCompactionsActive [3]int64 // Gauge of TSM compactions (by level) currently running. - TSMCompactionErrors [3]int64 // Counter of TSM compcations (by level) that have failed due to error. - TSMCompactionDuration [3]int64 // Counter of number of wall nanoseconds spent in TSM compactions (by level). - TSMCompactionsQueue [3]int64 // Gauge of TSM compactions queues (by level). - - TSMOptimizeCompactions int64 // Counter of optimize compactions that have ever run. - TSMOptimizeCompactionsActive int64 // Gauge of optimize compactions currently running. - TSMOptimizeCompactionErrors int64 // Counter of optimize compactions that have failed due to error. - TSMOptimizeCompactionDuration int64 // Counter of number of wall nanoseconds spent in optimize compactions. - TSMOptimizeCompactionsQueue int64 // Gauge of optimize compactions queue. - - TSMFullCompactions int64 // Counter of full compactions that have ever run. - TSMFullCompactionsActive int64 // Gauge of full compactions currently running. - TSMFullCompactionErrors int64 // Counter of full compactions that have failed due to error. - TSMFullCompactionDuration int64 // Counter of number of wall nanoseconds spent in full compactions. - TSMFullCompactionsQueue int64 // Gauge of full compactions queue. -} - -// Statistics returns statistics for periodic monitoring. -func (e *Engine) Statistics(tags map[string]string) []models.Statistic { - statistics := make([]models.Statistic, 0, 4) - statistics = append(statistics, models.Statistic{ - Name: "tsm1_engine", - Tags: tags, - Values: map[string]interface{}{ - statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions), - statCacheCompactionsActive: atomic.LoadInt64(&e.stats.CacheCompactionsActive), - statCacheCompactionError: atomic.LoadInt64(&e.stats.CacheCompactionErrors), - statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration), - - statTSMLevel1Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[0]), - statTSMLevel1CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]), - statTSMLevel1CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[0]), - statTSMLevel1CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[0]), - statTSMLevel1CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[0]), - - statTSMLevel2Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[1]), - statTSMLevel2CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]), - statTSMLevel2CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[1]), - statTSMLevel2CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[1]), - statTSMLevel2CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[1]), - - statTSMLevel3Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[2]), - statTSMLevel3CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]), - statTSMLevel3CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[2]), - statTSMLevel3CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[2]), - statTSMLevel3CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[2]), - - statTSMOptimizeCompactions: atomic.LoadInt64(&e.stats.TSMOptimizeCompactions), - statTSMOptimizeCompactionsActive: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive), - statTSMOptimizeCompactionError: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionErrors), - statTSMOptimizeCompactionDuration: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionDuration), - statTSMOptimizeCompactionQueue: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsQueue), - - statTSMFullCompactions: atomic.LoadInt64(&e.stats.TSMFullCompactions), - statTSMFullCompactionsActive: atomic.LoadInt64(&e.stats.TSMFullCompactionsActive), - statTSMFullCompactionError: atomic.LoadInt64(&e.stats.TSMFullCompactionErrors), - statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration), - statTSMFullCompactionQueue: atomic.LoadInt64(&e.stats.TSMFullCompactionsQueue), - }, - }) - - statistics = append(statistics, e.Cache.Statistics(tags)...) - statistics = append(statistics, e.FileStore.Statistics(tags)...) - return statistics -} - // DiskSize returns the total size in bytes of all TSM and WAL segments on disk. func (e *Engine) DiskSize() int64 { walDiskSizeBytes := e.WAL.DiskSizeBytes() @@ -605,6 +495,11 @@ func (e *Engine) DiskSize() int64 { // Open opens and initializes the engine. func (e *Engine) Open() error { + // Initialise metrics... + e.blockMetrics = newBlockMetrics(e.defaultMetricLabels) + e.compactionTracker = newCompactionTracker(e.blockMetrics) + e.scheduler.setCompactionTracker(e.compactionTracker) + if err := os.MkdirAll(e.path, 0777); err != nil { return err } @@ -649,6 +544,18 @@ func (e *Engine) Close() error { return e.WAL.Close() } +// PrometheusCollectors returns all the prometheus collectors associated with +// the engine and its components. +func (e *Engine) PrometheusCollectors() []prometheus.Collector { + var metrics []prometheus.Collector + metrics = append(metrics, e.blockMetrics.PrometheusCollectors()...) + + // TODO(edd): Add Filestore metrics + // TODO(edd): Add Cache metrics + // TODO(edd): Add WAL metrics + return metrics +} + // WithLogger sets the logger for the engine. func (e *Engine) WithLogger(log *zap.Logger) { e.logger = log.With(zap.String("engine", "tsm1")) @@ -668,15 +575,7 @@ func (e *Engine) WithLogger(log *zap.Logger) { // shard is fully compacted. func (e *Engine) IsIdle() bool { cacheEmpty := e.Cache.Size() == 0 - - runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive) - runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]) - runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]) - runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]) - runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive) - runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive) - - return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted() + return cacheEmpty && e.compactionTracker.AllActive() == 0 && e.CompactionPlan.FullyCompacted() } // Free releases any resources held by the engine to free up memory or CPU. @@ -1106,6 +1005,149 @@ func (e *Engine) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection) return e.index.CreateSeriesListIfNotExists(collection) } +// WriteTo is not implemented. +func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } + +// compactionLevel describes a snapshot or levelled compaction. +type compactionLevel int + +func (l compactionLevel) String() string { + switch l { + case 0: + return "snapshot" + case 1, 2, 3: + return fmt.Sprint(int(l)) + case 4: + return "optimize" + case 5: + return "full" + default: + panic("unsupported compaction level") + } +} + +// compactionTracker tracks compactions and snapshots within the Engine. +// +// As well as being responsible for providing atomic reads and writes to the +// statistics tracking the various compaction operations, compactionTracker also +// mirrors any writes to the prometheus block metrics, which the Engine exposes. +// +// *NOTE* - compactionTracker fields should not be directory modified. Doing so +// could result in the Engine exposing inaccurate metrics. +type compactionTracker struct { + metrics *blockMetrics + + // Note: Compactions are levelled as follows: + // 0 – Snapshots + // 1-3 – Levelled compactions + // 4 – Optimize compactions + // 5 – Full compactions + + ok [6]uint64 // Counter of TSM compactions (by level) that have successfully completed. + active [6]uint64 // Gauge of TSM compactions (by level) currently running. + errors [6]uint64 // Counter of TSM compcations (by level) that have failed due to error. + queue [6]uint64 // Gauge of TSM compactions queues (by level). +} + +func newCompactionTracker(blockMetrics *blockMetrics) *compactionTracker { + return &compactionTracker{metrics: blockMetrics} +} + +// Completed returns the total number of compactions for the provided level. +func (t *compactionTracker) Completed(level int) uint64 { return atomic.LoadUint64(&t.ok[level]) } + +// Active returns the number of active snapshots (level 0), +// level 1, 2 or 3 compactions, optimize compactions (level 4), or full +// compactions (level 5). +func (t *compactionTracker) Active(level int) uint64 { + return atomic.LoadUint64(&t.active[level]) +} + +// AllActive returns the number of active snapshots and compactions. +func (t *compactionTracker) AllActive() uint64 { + var total uint64 + for i := 0; i < len(t.active); i++ { + total += atomic.LoadUint64(&t.active[i]) + } + return total +} + +// ActiveOptimise returns the number of active Optimise compactions. +// +// ActiveOptimise is a helper for Active(4). +func (t *compactionTracker) ActiveOptimise() uint64 { return t.Active(4) } + +// ActiveFull returns the number of active Full compactions. +// +// ActiveFull is a helper for Active(5). +func (t *compactionTracker) ActiveFull() uint64 { return t.Active(5) } + +// Errors returns the total number of errors encountered attempting compactions +// for the provided level. +func (t *compactionTracker) Errors(level int) uint64 { return atomic.LoadUint64(&t.errors[level]) } + +// IncActive increments the number of active compactions for the provided level. +func (t *compactionTracker) IncActive(level compactionLevel) { + atomic.AddUint64(&t.active[level], 1) + + labels := t.metrics.CompactionLabels(level) + t.metrics.CompactionsActive.With(labels).Inc() +} + +// IncFullActive increments the number of active Full compactions. +func (t *compactionTracker) IncFullActive() { t.IncActive(5) } + +// DecActive decrements the number of active compactions for the provided level. +func (t *compactionTracker) DecActive(level compactionLevel) { + atomic.AddUint64(&t.active[level], ^uint64(0)) + + labels := t.metrics.CompactionLabels(level) + t.metrics.CompactionsActive.With(labels).Dec() +} + +// DecFullActive decrements the number of active Full compactions. +func (t *compactionTracker) DecFullActive() { t.DecActive(5) } + +// Attempted updates the number of compactions attempted for the provided level. +func (t *compactionTracker) Attempted(level compactionLevel, success bool, duration time.Duration) { + if success { + atomic.AddUint64(&t.ok[level], 1) + + labels := t.metrics.CompactionLabels(level) + t.metrics.CompactionDuration.With(labels).Observe(duration.Seconds()) + + labels["status"] = "ok" + t.metrics.Compactions.With(labels).Inc() + return + } + + atomic.AddUint64(&t.errors[level], 1) + + labels := t.metrics.CompactionLabels(level) + labels["status"] = "error" + t.metrics.Compactions.With(labels).Inc() +} + +// SnapshotAttempted updates the number of snapshots attempted. +func (t *compactionTracker) SnapshotAttempted(success bool, duration time.Duration) { + t.Attempted(0, success, duration) +} + +// SetQueue sets the compaction queue depth for the provided level. +func (t *compactionTracker) SetQueue(level compactionLevel, length uint64) { + atomic.StoreUint64(&t.queue[level], length) + + labels := t.metrics.CompactionLabels(level) + t.metrics.CompactionQueue.With(labels).Set(float64(length)) +} + +// SetOptimiseQueue sets the queue depth for Optimisation compactions. +func (t *compactionTracker) SetOptimiseQueue(length uint64) { t.SetQueue(4, length) } + +// SetFullQueue sets the queue depth for Full compactions. +func (t *compactionTracker) SetFullQueue(length uint64) { t.SetQueue(5, length) } + +>>>>>>> Convert TSM compaction stats to Prom metrics // WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done. func (e *Engine) WriteSnapshot() error { // Lock and grab the cache snapshot along with all the closed WAL @@ -1216,11 +1258,8 @@ func (e *Engine) compactCache() { err := e.WriteSnapshot() if err != nil && err != errCompactionsDisabled { e.logger.Info("Error writing snapshot", zap.Error(err)) - atomic.AddInt64(&e.stats.CacheCompactionErrors, 1) - } else { - atomic.AddInt64(&e.stats.CacheCompactions, 1) } - atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds()) + e.compactionTracker.SnapshotAttempted(err == nil || err == errCompactionsDisabled, time.Since(start)) } } } @@ -1262,18 +1301,18 @@ func (e *Engine) compact(wg *sync.WaitGroup) { level2Groups := e.CompactionPlan.PlanLevel(2) level3Groups := e.CompactionPlan.PlanLevel(3) level4Groups := e.CompactionPlan.Plan(e.FileStore.LastModified()) - atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups))) + e.compactionTracker.SetOptimiseQueue(uint64(len(level4Groups))) // If no full compactions are need, see if an optimize is needed if len(level4Groups) == 0 { level4Groups = e.CompactionPlan.PlanOptimize() - atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups))) + e.compactionTracker.SetOptimiseQueue(uint64(len(level4Groups))) } // Update the level plan queue stats - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], int64(len(level1Groups))) - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], int64(len(level2Groups))) - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], int64(len(level3Groups))) + e.compactionTracker.SetQueue(1, uint64(len(level1Groups))) + e.compactionTracker.SetQueue(2, uint64(len(level2Groups))) + e.compactionTracker.SetQueue(3, uint64(len(level3Groups))) // Set the queue depths on the scheduler e.scheduler.setDepth(1, len(level1Groups)) @@ -1314,7 +1353,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) { // compactHiPriorityLevel kicks off compactions using the high priority policy. It returns // true if the compaction was started -func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool { +func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level compactionLevel, fast bool, wg *sync.WaitGroup) bool { s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false @@ -1322,13 +1361,12 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo // Try hi priority limiter, otherwise steal a little from the low priority if we can. if e.compactionLimiter.TryTake() { - atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1) + e.compactionTracker.IncActive(level) wg.Add(1) go func() { defer wg.Done() - defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1) - + defer e.compactionTracker.DecActive(level) defer e.compactionLimiter.Release() s.Apply() // Release the files in the compaction plan @@ -1343,7 +1381,7 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo // compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns // the plans that were not able to be started -func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool { +func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level compactionLevel, fast bool, wg *sync.WaitGroup) bool { s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false @@ -1351,11 +1389,11 @@ func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast boo // Try the lo priority limiter, otherwise steal a little from the high priority if we can. if e.compactionLimiter.TryTake() { - atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1) + e.compactionTracker.IncActive(level) wg.Add(1) go func() { defer wg.Done() - defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1) + defer e.compactionTracker.DecActive(level) defer e.compactionLimiter.Release() s.Apply() // Release the files in the compaction plan @@ -1376,11 +1414,11 @@ func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool { // Try the lo priority limiter, otherwise steal a little from the high priority if we can. if e.compactionLimiter.TryTake() { - atomic.AddInt64(&e.stats.TSMFullCompactionsActive, 1) + e.compactionTracker.IncFullActive() wg.Add(1) go func() { defer wg.Done() - defer atomic.AddInt64(&e.stats.TSMFullCompactionsActive, -1) + defer e.compactionTracker.DecFullActive() defer e.compactionLimiter.Release() s.Apply() // Release the files in the compaction plan @@ -1396,12 +1434,9 @@ type compactionStrategy struct { group CompactionGroup fast bool - level int + level compactionLevel - durationStat *int64 - activeStat *int64 - successStat *int64 - errorStat *int64 + tracker *compactionTracker logger *zap.Logger compactor *Compactor @@ -1412,13 +1447,12 @@ type compactionStrategy struct { // Apply concurrently compacts all the groups in a compaction strategy. func (s *compactionStrategy) Apply() { - start := time.Now() s.compactGroup() - atomic.AddInt64(s.durationStat, time.Since(start).Nanoseconds()) } // compactGroup executes the compaction strategy against a single CompactionGroup. func (s *compactionStrategy) compactGroup() { + now := time.Now() group := s.group log, logEnd := logger.NewOperation(s.logger, "TSM compaction", "tsm1_compact_group") defer logEnd() @@ -1451,14 +1485,14 @@ func (s *compactionStrategy) compactGroup() { } log.Info("Error compacting TSM files", zap.Error(err)) - atomic.AddInt64(s.errorStat, 1) + s.tracker.Attempted(s.level, false, 0) time.Sleep(time.Second) return } if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil { log.Info("Error replacing new TSM files", zap.Error(err)) - atomic.AddInt64(s.errorStat, 1) + s.tracker.Attempted(s.level, false, 0) time.Sleep(time.Second) return } @@ -1466,27 +1500,22 @@ func (s *compactionStrategy) compactGroup() { for i, f := range files { log.Info("Compacted file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f)) } - log.Info("Finished compacting files", - zap.Int("tsm1_files_n", len(files))) - atomic.AddInt64(s.successStat, 1) + log.Info("Finished compacting files", zap.Int("tsm1_files_n", len(files))) + s.tracker.Attempted(s.level, true, time.Since(now)) } // levelCompactionStrategy returns a compactionStrategy for the given level. // It returns nil if there are no TSM files to compact. -func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy { +func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level compactionLevel) *compactionStrategy { return &compactionStrategy{ group: group, - logger: e.logger.With(zap.Int("tsm1_level", level), zap.String("tsm1_strategy", "level")), + logger: e.logger.With(zap.Int("tsm1_level", int(level)), zap.String("tsm1_strategy", "level")), fileStore: e.FileStore, compactor: e.Compactor, fast: fast, engine: e, level: level, - - activeStat: &e.stats.TSMCompactionsActive[level-1], - successStat: &e.stats.TSMCompactions[level-1], - errorStat: &e.stats.TSMCompactionErrors[level-1], - durationStat: &e.stats.TSMCompactionDuration[level-1], + tracker: e.compactionTracker, } } @@ -1500,21 +1529,12 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c compactor: e.Compactor, fast: optimize, engine: e, - level: 4, + level: 5, } if optimize { - s.activeStat = &e.stats.TSMOptimizeCompactionsActive - s.successStat = &e.stats.TSMOptimizeCompactions - s.errorStat = &e.stats.TSMOptimizeCompactionErrors - s.durationStat = &e.stats.TSMOptimizeCompactionDuration - } else { - s.activeStat = &e.stats.TSMFullCompactionsActive - s.successStat = &e.stats.TSMFullCompactions - s.errorStat = &e.stats.TSMFullCompactionErrors - s.durationStat = &e.stats.TSMFullCompactionDuration + s.level = 4 } - return s } diff --git a/tsdb/tsm1/metrics.go b/tsdb/tsm1/metrics.go new file mode 100644 index 0000000000..6f64ce95cb --- /dev/null +++ b/tsdb/tsm1/metrics.go @@ -0,0 +1,84 @@ +package tsm1 + +import ( + "fmt" + "sort" + + "github.com/prometheus/client_golang/prometheus" +) + +// namespace is the leading part of all published metrics for the Storage service. +const namespace = "storage" + +const blockSubsystem = "block" // sub-system associated with metrics for block storage. + +// blockMetrics are a set of metrics concerned with tracking data about block storage. +type blockMetrics struct { + labels prometheus.Labels // Read only. + + Compactions *prometheus.CounterVec + CompactionsActive *prometheus.GaugeVec + CompactionDuration *prometheus.HistogramVec + CompactionQueue *prometheus.GaugeVec +} + +// newBlockMetrics initialises the prometheus metrics for the block subsystem. +func newBlockMetrics(labels prometheus.Labels) *blockMetrics { + compactionNames := []string{"level"} // All compaction metrics have a `level` label. + for k := range labels { + compactionNames = append(compactionNames, k) + } + sort.Strings(compactionNames) + totalCompactionsNames := append(compactionNames, "status") + sort.Strings(totalCompactionsNames) + + return &blockMetrics{ + labels: labels, + Compactions: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: blockSubsystem, + Name: "compactions_total", + Help: "Number of times cache snapshotted or TSM compaction attempted.", + }, totalCompactionsNames), + CompactionsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockSubsystem, + Name: "compactions_active", + Help: "Number of active compactions.", + }, compactionNames), + CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: blockSubsystem, + Name: "compaction_duration_seconds", + Help: "Time taken for a successful compaction or snapshot.", + // 30 buckets spaced exponentially between 5s and ~53 minutes. + Buckets: prometheus.ExponentialBuckets(5.0, 1.25, 30), + }, compactionNames), + CompactionQueue: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockSubsystem, + Name: "compactions_queued", + Help: "Number of queued compactions.", + }, compactionNames), + } +} + +// CompactionLabels returns a copy of labels for use with compaction metrics. +func (b *blockMetrics) CompactionLabels(level compactionLevel) prometheus.Labels { + l := make(map[string]string, len(b.labels)) + for k, v := range b.labels { + l[k] = v + } + l["level"] = fmt.Sprint(level) + return l +} + +// PrometheusCollectors satisfies the prom.PrometheusCollector interface. +func (b *blockMetrics) PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + b.Compactions, + b.CompactionsActive, + b.CompactionDuration, + b.CompactionQueue, + } +} diff --git a/tsdb/tsm1/scheduler.go b/tsdb/tsm1/scheduler.go index d360afc3e7..833f98c817 100644 --- a/tsdb/tsm1/scheduler.go +++ b/tsdb/tsm1/scheduler.go @@ -1,28 +1,28 @@ package tsm1 -import ( - "sync/atomic" -) - var defaultWeights = [4]float64{0.4, 0.3, 0.2, 0.1} type scheduler struct { - maxConcurrency int - stats *EngineStatistics + maxConcurrency int + compactionTracker *compactionTracker // queues is the depth of work pending for each compaction level queues [4]int weights [4]float64 } -func newScheduler(stats *EngineStatistics, maxConcurrency int) *scheduler { +func newScheduler(maxConcurrency int) *scheduler { return &scheduler{ - stats: stats, maxConcurrency: maxConcurrency, weights: defaultWeights, } } +// setCompactionTracker sets the metrics on the scheduler. It must be called before next. +func (s *scheduler) setCompactionTracker(tracker *compactionTracker) { + s.compactionTracker = tracker +} + func (s *scheduler) setDepth(level, depth int) { level = level - 1 if level < 0 || level > len(s.queues) { @@ -33,10 +33,10 @@ func (s *scheduler) setDepth(level, depth int) { } func (s *scheduler) next() (int, bool) { - level1Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[0])) - level2Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[1])) - level3Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[2])) - level4Running := int(atomic.LoadInt64(&s.stats.TSMFullCompactionsActive) + atomic.LoadInt64(&s.stats.TSMOptimizeCompactionsActive)) + level1Running := int(s.compactionTracker.Active(1)) + level2Running := int(s.compactionTracker.Active(2)) + level3Running := int(s.compactionTracker.Active(3)) + level4Running := int(s.compactionTracker.ActiveFull() + s.compactionTracker.ActiveOptimise()) if level1Running+level2Running+level3Running+level4Running >= s.maxConcurrency { return 0, false