Convert TSM compaction stats to Prom metrics

This commits converts all the 1.x TSM compaction statistics, which
previously were written to an _internal db, to Prometheus metrics.
pull/10616/head
Edd Robinson 2018-10-31 17:41:56 +00:00
parent 8d546ff689
commit f56bc0853f
6 changed files with 419 additions and 186 deletions

View File

@ -271,6 +271,8 @@ func (m *Main) run(ctx context.Context) (err error) {
m.logger.Error("failed to open engine", zap.Error(err))
return err
}
// The Engine's metrics must be registered after it opens.
reg.MustRegister(engine.PrometheusCollectors()...)
pointsWriter = m.engine

View File

@ -169,6 +169,7 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
// TODO(edd): Get prom metrics for TSM.
// TODO(edd): Get prom metrics for index.
// TODO(edd): Get prom metrics for series file.
metrics = append(metrics, e.engine.PrometheusCollectors()...)
metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...)
return metrics
}

126
tsdb/engine.go Normal file
View File

@ -0,0 +1,126 @@
package tsdb
import (
"context"
"errors"
"regexp"
"runtime"
"time"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/limiter"
"go.uber.org/zap"
)
var (
// ErrUnknownEngineFormat is returned when the engine format is
// unknown. ErrUnknownEngineFormat is currently returned if a format
// other than tsm1 is encountered.
ErrUnknownEngineFormat = errors.New("unknown engine format")
)
// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
ScheduleFullCompaction() error
WithLogger(*zap.Logger)
CreateCursorIterator(ctx context.Context) (CursorIterator, error)
WritePoints(points []models.Point) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error
CreateSeriesListIfNotExists(collection *SeriesCollection) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error
SeriesN() int64
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
TagKeyCardinality(name, key []byte) int
LastModified() time.Time
DiskSize() int64
IsIdle() bool
Free() error
}
// SeriesIDSets provides access to the total set of series IDs
type SeriesIDSets interface {
ForEach(f func(ids *SeriesIDSet)) error
}
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
ShardID uint64
// Limits the concurrent number of TSM files that can be loaded at once.
OpenLimiter limiter.Fixed
// CompactionDisabled specifies shards should not schedule compactions.
// This option is intended for offline tooling.
CompactionDisabled bool
CompactionPlannerCreator CompactionPlannerCreator
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
WALEnabled bool
MonitorDisabled bool
// DatabaseFilter is a predicate controlling which databases may be opened.
// If no function is set, all databases will be opened.
DatabaseFilter func(database string) bool
// RetentionPolicyFilter is a predicate controlling which combination of database and retention policy may be opened.
// nil will allow all combinations to pass.
RetentionPolicyFilter func(database, rp string) bool
// ShardFilter is a predicate controlling which combination of database, retention policy and shard group may be opened.
// nil will allow all combinations to pass.
ShardFilter func(database, rp string, id uint64) bool
Config Config
SeriesIDSets SeriesIDSets
OnNewEngine func(Engine)
FileStoreObserver FileStoreObserver
}
// NewEngineOptions constructs an EngineOptions object with safe default values.
// This should only be used in tests; production environments should read from a config file.
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
Config: NewConfig(),
OpenLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
CompactionLimiter: limiter.NewFixed(1),
WALEnabled: false,
}
}
// NewInmemIndex returns a new "inmem" index type.
var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error)
type CompactionPlannerCreator func(cfg Config) interface{}
// FileStoreObserver is passed notifications before the file store adds or deletes files. In this way, it can
// be sure to observe every file that is added or removed even in the presence of process death.
type FileStoreObserver interface {
// FileFinishing is called before a file is renamed to it's final name.
FileFinishing(path string) error
// FileUnlinking is called before a file is unlinked.
FileUnlinking(path string) error
}

View File

@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"os"
@ -26,6 +27,7 @@ import (
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/tsi1"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -35,7 +37,8 @@ import (
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl
var ( // Static objects to prevent small allocs.
var (
// Static objects to prevent small allocs.
keyFieldSeparatorBytes = []byte(keyFieldSeparator)
emptyBytes = []byte{}
)
@ -70,44 +73,6 @@ const (
MaxPointsPerBlock = 1000
)
// Statistics gathered by the engine.
const (
statCacheCompactions = "cacheCompactions"
statCacheCompactionsActive = "cacheCompactionsActive"
statCacheCompactionError = "cacheCompactionErr"
statCacheCompactionDuration = "cacheCompactionDuration"
statTSMLevel1Compactions = "tsmLevel1Compactions"
statTSMLevel1CompactionsActive = "tsmLevel1CompactionsActive"
statTSMLevel1CompactionError = "tsmLevel1CompactionErr"
statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration"
statTSMLevel1CompactionQueue = "tsmLevel1CompactionQueue"
statTSMLevel2Compactions = "tsmLevel2Compactions"
statTSMLevel2CompactionsActive = "tsmLevel2CompactionsActive"
statTSMLevel2CompactionError = "tsmLevel2CompactionErr"
statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration"
statTSMLevel2CompactionQueue = "tsmLevel2CompactionQueue"
statTSMLevel3Compactions = "tsmLevel3Compactions"
statTSMLevel3CompactionsActive = "tsmLevel3CompactionsActive"
statTSMLevel3CompactionError = "tsmLevel3CompactionErr"
statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration"
statTSMLevel3CompactionQueue = "tsmLevel3CompactionQueue"
statTSMOptimizeCompactions = "tsmOptimizeCompactions"
statTSMOptimizeCompactionsActive = "tsmOptimizeCompactionsActive"
statTSMOptimizeCompactionError = "tsmOptimizeCompactionErr"
statTSMOptimizeCompactionDuration = "tsmOptimizeCompactionDuration"
statTSMOptimizeCompactionQueue = "tsmOptimizeCompactionQueue"
statTSMFullCompactions = "tsmFullCompactions"
statTSMFullCompactionsActive = "tsmFullCompactionsActive"
statTSMFullCompactionError = "tsmFullCompactionErr"
statTSMFullCompactionDuration = "tsmFullCompactionDuration"
statTSMFullCompactionQueue = "tsmFullCompactionQueue"
)
// An EngineOption is a functional option for changing the configuration of
// an Engine.
type EngineOption func(i *Engine)
@ -190,7 +155,9 @@ type Engine struct {
// Controls whether to enabled compactions when the engine is open
enableCompactionsOnOpen bool
stats *EngineStatistics
compactionTracker *compactionTracker // Used to track state of compactions.
blockMetrics *blockMetrics // Provides Engine metrics to external systems.
defaultMetricLabels prometheus.Labels // N.B this must not be mutated after Open is called.
// Limiter for concurrent compactions.
compactionLimiter limiter.Fixed
@ -234,7 +201,6 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt
}
logger := zap.NewNop()
stats := &EngineStatistics{}
e := &Engine{
path: path,
index: idx,
@ -254,7 +220,6 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt
CacheFlushWriteColdDuration: time.Duration(config.Cache.SnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
formatFileName: DefaultFormatFileName,
stats: stats,
compactionLimiter: limiter.NewFixed(maxCompactions),
scheduler: newScheduler(stats, maxCompactions),
}
@ -522,81 +487,6 @@ func (e *Engine) MeasurementStats() (MeasurementStats, error) {
return e.FileStore.MeasurementStats()
}
// EngineStatistics maintains statistics for the engine.
type EngineStatistics struct {
CacheCompactions int64 // Counter of cache compactions that have ever run.
CacheCompactionsActive int64 // Gauge of cache compactions currently running.
CacheCompactionErrors int64 // Counter of cache compactions that have failed due to error.
CacheCompactionDuration int64 // Counter of number of wall nanoseconds spent in cache compactions.
TSMCompactions [3]int64 // Counter of TSM compactions (by level) that have ever run.
TSMCompactionsActive [3]int64 // Gauge of TSM compactions (by level) currently running.
TSMCompactionErrors [3]int64 // Counter of TSM compcations (by level) that have failed due to error.
TSMCompactionDuration [3]int64 // Counter of number of wall nanoseconds spent in TSM compactions (by level).
TSMCompactionsQueue [3]int64 // Gauge of TSM compactions queues (by level).
TSMOptimizeCompactions int64 // Counter of optimize compactions that have ever run.
TSMOptimizeCompactionsActive int64 // Gauge of optimize compactions currently running.
TSMOptimizeCompactionErrors int64 // Counter of optimize compactions that have failed due to error.
TSMOptimizeCompactionDuration int64 // Counter of number of wall nanoseconds spent in optimize compactions.
TSMOptimizeCompactionsQueue int64 // Gauge of optimize compactions queue.
TSMFullCompactions int64 // Counter of full compactions that have ever run.
TSMFullCompactionsActive int64 // Gauge of full compactions currently running.
TSMFullCompactionErrors int64 // Counter of full compactions that have failed due to error.
TSMFullCompactionDuration int64 // Counter of number of wall nanoseconds spent in full compactions.
TSMFullCompactionsQueue int64 // Gauge of full compactions queue.
}
// Statistics returns statistics for periodic monitoring.
func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, 0, 4)
statistics = append(statistics, models.Statistic{
Name: "tsm1_engine",
Tags: tags,
Values: map[string]interface{}{
statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions),
statCacheCompactionsActive: atomic.LoadInt64(&e.stats.CacheCompactionsActive),
statCacheCompactionError: atomic.LoadInt64(&e.stats.CacheCompactionErrors),
statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration),
statTSMLevel1Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[0]),
statTSMLevel1CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]),
statTSMLevel1CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[0]),
statTSMLevel1CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[0]),
statTSMLevel1CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[0]),
statTSMLevel2Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[1]),
statTSMLevel2CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]),
statTSMLevel2CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[1]),
statTSMLevel2CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[1]),
statTSMLevel2CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[1]),
statTSMLevel3Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[2]),
statTSMLevel3CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]),
statTSMLevel3CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[2]),
statTSMLevel3CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[2]),
statTSMLevel3CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[2]),
statTSMOptimizeCompactions: atomic.LoadInt64(&e.stats.TSMOptimizeCompactions),
statTSMOptimizeCompactionsActive: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive),
statTSMOptimizeCompactionError: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionErrors),
statTSMOptimizeCompactionDuration: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionDuration),
statTSMOptimizeCompactionQueue: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsQueue),
statTSMFullCompactions: atomic.LoadInt64(&e.stats.TSMFullCompactions),
statTSMFullCompactionsActive: atomic.LoadInt64(&e.stats.TSMFullCompactionsActive),
statTSMFullCompactionError: atomic.LoadInt64(&e.stats.TSMFullCompactionErrors),
statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration),
statTSMFullCompactionQueue: atomic.LoadInt64(&e.stats.TSMFullCompactionsQueue),
},
})
statistics = append(statistics, e.Cache.Statistics(tags)...)
statistics = append(statistics, e.FileStore.Statistics(tags)...)
return statistics
}
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
walDiskSizeBytes := e.WAL.DiskSizeBytes()
@ -605,6 +495,11 @@ func (e *Engine) DiskSize() int64 {
// Open opens and initializes the engine.
func (e *Engine) Open() error {
// Initialise metrics...
e.blockMetrics = newBlockMetrics(e.defaultMetricLabels)
e.compactionTracker = newCompactionTracker(e.blockMetrics)
e.scheduler.setCompactionTracker(e.compactionTracker)
if err := os.MkdirAll(e.path, 0777); err != nil {
return err
}
@ -649,6 +544,18 @@ func (e *Engine) Close() error {
return e.WAL.Close()
}
// PrometheusCollectors returns all the prometheus collectors associated with
// the engine and its components.
func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, e.blockMetrics.PrometheusCollectors()...)
// TODO(edd): Add Filestore metrics
// TODO(edd): Add Cache metrics
// TODO(edd): Add WAL metrics
return metrics
}
// WithLogger sets the logger for the engine.
func (e *Engine) WithLogger(log *zap.Logger) {
e.logger = log.With(zap.String("engine", "tsm1"))
@ -668,15 +575,7 @@ func (e *Engine) WithLogger(log *zap.Logger) {
// shard is fully compacted.
func (e *Engine) IsIdle() bool {
cacheEmpty := e.Cache.Size() == 0
runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive)
return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted()
return cacheEmpty && e.compactionTracker.AllActive() == 0 && e.CompactionPlan.FullyCompacted()
}
// Free releases any resources held by the engine to free up memory or CPU.
@ -1106,6 +1005,149 @@ func (e *Engine) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection)
return e.index.CreateSeriesListIfNotExists(collection)
}
// WriteTo is not implemented.
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// compactionLevel describes a snapshot or levelled compaction.
type compactionLevel int
func (l compactionLevel) String() string {
switch l {
case 0:
return "snapshot"
case 1, 2, 3:
return fmt.Sprint(int(l))
case 4:
return "optimize"
case 5:
return "full"
default:
panic("unsupported compaction level")
}
}
// compactionTracker tracks compactions and snapshots within the Engine.
//
// As well as being responsible for providing atomic reads and writes to the
// statistics tracking the various compaction operations, compactionTracker also
// mirrors any writes to the prometheus block metrics, which the Engine exposes.
//
// *NOTE* - compactionTracker fields should not be directory modified. Doing so
// could result in the Engine exposing inaccurate metrics.
type compactionTracker struct {
metrics *blockMetrics
// Note: Compactions are levelled as follows:
// 0 Snapshots
// 1-3 Levelled compactions
// 4 Optimize compactions
// 5 Full compactions
ok [6]uint64 // Counter of TSM compactions (by level) that have successfully completed.
active [6]uint64 // Gauge of TSM compactions (by level) currently running.
errors [6]uint64 // Counter of TSM compcations (by level) that have failed due to error.
queue [6]uint64 // Gauge of TSM compactions queues (by level).
}
func newCompactionTracker(blockMetrics *blockMetrics) *compactionTracker {
return &compactionTracker{metrics: blockMetrics}
}
// Completed returns the total number of compactions for the provided level.
func (t *compactionTracker) Completed(level int) uint64 { return atomic.LoadUint64(&t.ok[level]) }
// Active returns the number of active snapshots (level 0),
// level 1, 2 or 3 compactions, optimize compactions (level 4), or full
// compactions (level 5).
func (t *compactionTracker) Active(level int) uint64 {
return atomic.LoadUint64(&t.active[level])
}
// AllActive returns the number of active snapshots and compactions.
func (t *compactionTracker) AllActive() uint64 {
var total uint64
for i := 0; i < len(t.active); i++ {
total += atomic.LoadUint64(&t.active[i])
}
return total
}
// ActiveOptimise returns the number of active Optimise compactions.
//
// ActiveOptimise is a helper for Active(4).
func (t *compactionTracker) ActiveOptimise() uint64 { return t.Active(4) }
// ActiveFull returns the number of active Full compactions.
//
// ActiveFull is a helper for Active(5).
func (t *compactionTracker) ActiveFull() uint64 { return t.Active(5) }
// Errors returns the total number of errors encountered attempting compactions
// for the provided level.
func (t *compactionTracker) Errors(level int) uint64 { return atomic.LoadUint64(&t.errors[level]) }
// IncActive increments the number of active compactions for the provided level.
func (t *compactionTracker) IncActive(level compactionLevel) {
atomic.AddUint64(&t.active[level], 1)
labels := t.metrics.CompactionLabels(level)
t.metrics.CompactionsActive.With(labels).Inc()
}
// IncFullActive increments the number of active Full compactions.
func (t *compactionTracker) IncFullActive() { t.IncActive(5) }
// DecActive decrements the number of active compactions for the provided level.
func (t *compactionTracker) DecActive(level compactionLevel) {
atomic.AddUint64(&t.active[level], ^uint64(0))
labels := t.metrics.CompactionLabels(level)
t.metrics.CompactionsActive.With(labels).Dec()
}
// DecFullActive decrements the number of active Full compactions.
func (t *compactionTracker) DecFullActive() { t.DecActive(5) }
// Attempted updates the number of compactions attempted for the provided level.
func (t *compactionTracker) Attempted(level compactionLevel, success bool, duration time.Duration) {
if success {
atomic.AddUint64(&t.ok[level], 1)
labels := t.metrics.CompactionLabels(level)
t.metrics.CompactionDuration.With(labels).Observe(duration.Seconds())
labels["status"] = "ok"
t.metrics.Compactions.With(labels).Inc()
return
}
atomic.AddUint64(&t.errors[level], 1)
labels := t.metrics.CompactionLabels(level)
labels["status"] = "error"
t.metrics.Compactions.With(labels).Inc()
}
// SnapshotAttempted updates the number of snapshots attempted.
func (t *compactionTracker) SnapshotAttempted(success bool, duration time.Duration) {
t.Attempted(0, success, duration)
}
// SetQueue sets the compaction queue depth for the provided level.
func (t *compactionTracker) SetQueue(level compactionLevel, length uint64) {
atomic.StoreUint64(&t.queue[level], length)
labels := t.metrics.CompactionLabels(level)
t.metrics.CompactionQueue.With(labels).Set(float64(length))
}
// SetOptimiseQueue sets the queue depth for Optimisation compactions.
func (t *compactionTracker) SetOptimiseQueue(length uint64) { t.SetQueue(4, length) }
// SetFullQueue sets the queue depth for Full compactions.
func (t *compactionTracker) SetFullQueue(length uint64) { t.SetQueue(5, length) }
>>>>>>> Convert TSM compaction stats to Prom metrics
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
func (e *Engine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL
@ -1216,11 +1258,8 @@ func (e *Engine) compactCache() {
err := e.WriteSnapshot()
if err != nil && err != errCompactionsDisabled {
e.logger.Info("Error writing snapshot", zap.Error(err))
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
atomic.AddInt64(&e.stats.CacheCompactions, 1)
}
atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())
e.compactionTracker.SnapshotAttempted(err == nil || err == errCompactionsDisabled, time.Since(start))
}
}
}
@ -1262,18 +1301,18 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
level2Groups := e.CompactionPlan.PlanLevel(2)
level3Groups := e.CompactionPlan.PlanLevel(3)
level4Groups := e.CompactionPlan.Plan(e.FileStore.LastModified())
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))
e.compactionTracker.SetOptimiseQueue(uint64(len(level4Groups)))
// If no full compactions are need, see if an optimize is needed
if len(level4Groups) == 0 {
level4Groups = e.CompactionPlan.PlanOptimize()
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))
e.compactionTracker.SetOptimiseQueue(uint64(len(level4Groups)))
}
// Update the level plan queue stats
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], int64(len(level1Groups)))
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], int64(len(level2Groups)))
atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], int64(len(level3Groups)))
e.compactionTracker.SetQueue(1, uint64(len(level1Groups)))
e.compactionTracker.SetQueue(2, uint64(len(level2Groups)))
e.compactionTracker.SetQueue(3, uint64(len(level3Groups)))
// Set the queue depths on the scheduler
e.scheduler.setDepth(1, len(level1Groups))
@ -1314,7 +1353,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns
// true if the compaction was started
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level compactionLevel, fast bool, wg *sync.WaitGroup) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
@ -1322,13 +1361,12 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo
// Try hi priority limiter, otherwise steal a little from the low priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1)
e.compactionTracker.IncActive(level)
wg.Add(1)
go func() {
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)
defer e.compactionTracker.DecActive(level)
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
@ -1343,7 +1381,7 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo
// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns
// the plans that were not able to be started
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level compactionLevel, fast bool, wg *sync.WaitGroup) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
@ -1351,11 +1389,11 @@ func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast boo
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1)
e.compactionTracker.IncActive(level)
wg.Add(1)
go func() {
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)
defer e.compactionTracker.DecActive(level)
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
@ -1376,11 +1414,11 @@ func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool {
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMFullCompactionsActive, 1)
e.compactionTracker.IncFullActive()
wg.Add(1)
go func() {
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMFullCompactionsActive, -1)
defer e.compactionTracker.DecFullActive()
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
@ -1396,12 +1434,9 @@ type compactionStrategy struct {
group CompactionGroup
fast bool
level int
level compactionLevel
durationStat *int64
activeStat *int64
successStat *int64
errorStat *int64
tracker *compactionTracker
logger *zap.Logger
compactor *Compactor
@ -1412,13 +1447,12 @@ type compactionStrategy struct {
// Apply concurrently compacts all the groups in a compaction strategy.
func (s *compactionStrategy) Apply() {
start := time.Now()
s.compactGroup()
atomic.AddInt64(s.durationStat, time.Since(start).Nanoseconds())
}
// compactGroup executes the compaction strategy against a single CompactionGroup.
func (s *compactionStrategy) compactGroup() {
now := time.Now()
group := s.group
log, logEnd := logger.NewOperation(s.logger, "TSM compaction", "tsm1_compact_group")
defer logEnd()
@ -1451,14 +1485,14 @@ func (s *compactionStrategy) compactGroup() {
}
log.Info("Error compacting TSM files", zap.Error(err))
atomic.AddInt64(s.errorStat, 1)
s.tracker.Attempted(s.level, false, 0)
time.Sleep(time.Second)
return
}
if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil {
log.Info("Error replacing new TSM files", zap.Error(err))
atomic.AddInt64(s.errorStat, 1)
s.tracker.Attempted(s.level, false, 0)
time.Sleep(time.Second)
return
}
@ -1466,27 +1500,22 @@ func (s *compactionStrategy) compactGroup() {
for i, f := range files {
log.Info("Compacted file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f))
}
log.Info("Finished compacting files",
zap.Int("tsm1_files_n", len(files)))
atomic.AddInt64(s.successStat, 1)
log.Info("Finished compacting files", zap.Int("tsm1_files_n", len(files)))
s.tracker.Attempted(s.level, true, time.Since(now))
}
// levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level compactionLevel) *compactionStrategy {
return &compactionStrategy{
group: group,
logger: e.logger.With(zap.Int("tsm1_level", level), zap.String("tsm1_strategy", "level")),
logger: e.logger.With(zap.Int("tsm1_level", int(level)), zap.String("tsm1_strategy", "level")),
fileStore: e.FileStore,
compactor: e.Compactor,
fast: fast,
engine: e,
level: level,
activeStat: &e.stats.TSMCompactionsActive[level-1],
successStat: &e.stats.TSMCompactions[level-1],
errorStat: &e.stats.TSMCompactionErrors[level-1],
durationStat: &e.stats.TSMCompactionDuration[level-1],
tracker: e.compactionTracker,
}
}
@ -1500,21 +1529,12 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c
compactor: e.Compactor,
fast: optimize,
engine: e,
level: 4,
level: 5,
}
if optimize {
s.activeStat = &e.stats.TSMOptimizeCompactionsActive
s.successStat = &e.stats.TSMOptimizeCompactions
s.errorStat = &e.stats.TSMOptimizeCompactionErrors
s.durationStat = &e.stats.TSMOptimizeCompactionDuration
} else {
s.activeStat = &e.stats.TSMFullCompactionsActive
s.successStat = &e.stats.TSMFullCompactions
s.errorStat = &e.stats.TSMFullCompactionErrors
s.durationStat = &e.stats.TSMFullCompactionDuration
s.level = 4
}
return s
}

84
tsdb/tsm1/metrics.go Normal file
View File

@ -0,0 +1,84 @@
package tsm1
import (
"fmt"
"sort"
"github.com/prometheus/client_golang/prometheus"
)
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"
const blockSubsystem = "block" // sub-system associated with metrics for block storage.
// blockMetrics are a set of metrics concerned with tracking data about block storage.
type blockMetrics struct {
labels prometheus.Labels // Read only.
Compactions *prometheus.CounterVec
CompactionsActive *prometheus.GaugeVec
CompactionDuration *prometheus.HistogramVec
CompactionQueue *prometheus.GaugeVec
}
// newBlockMetrics initialises the prometheus metrics for the block subsystem.
func newBlockMetrics(labels prometheus.Labels) *blockMetrics {
compactionNames := []string{"level"} // All compaction metrics have a `level` label.
for k := range labels {
compactionNames = append(compactionNames, k)
}
sort.Strings(compactionNames)
totalCompactionsNames := append(compactionNames, "status")
sort.Strings(totalCompactionsNames)
return &blockMetrics{
labels: labels,
Compactions: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compactions_total",
Help: "Number of times cache snapshotted or TSM compaction attempted.",
}, totalCompactionsNames),
CompactionsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compactions_active",
Help: "Number of active compactions.",
}, compactionNames),
CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compaction_duration_seconds",
Help: "Time taken for a successful compaction or snapshot.",
// 30 buckets spaced exponentially between 5s and ~53 minutes.
Buckets: prometheus.ExponentialBuckets(5.0, 1.25, 30),
}, compactionNames),
CompactionQueue: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compactions_queued",
Help: "Number of queued compactions.",
}, compactionNames),
}
}
// CompactionLabels returns a copy of labels for use with compaction metrics.
func (b *blockMetrics) CompactionLabels(level compactionLevel) prometheus.Labels {
l := make(map[string]string, len(b.labels))
for k, v := range b.labels {
l[k] = v
}
l["level"] = fmt.Sprint(level)
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (b *blockMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
b.Compactions,
b.CompactionsActive,
b.CompactionDuration,
b.CompactionQueue,
}
}

View File

@ -1,28 +1,28 @@
package tsm1
import (
"sync/atomic"
)
var defaultWeights = [4]float64{0.4, 0.3, 0.2, 0.1}
type scheduler struct {
maxConcurrency int
stats *EngineStatistics
maxConcurrency int
compactionTracker *compactionTracker
// queues is the depth of work pending for each compaction level
queues [4]int
weights [4]float64
}
func newScheduler(stats *EngineStatistics, maxConcurrency int) *scheduler {
func newScheduler(maxConcurrency int) *scheduler {
return &scheduler{
stats: stats,
maxConcurrency: maxConcurrency,
weights: defaultWeights,
}
}
// setCompactionTracker sets the metrics on the scheduler. It must be called before next.
func (s *scheduler) setCompactionTracker(tracker *compactionTracker) {
s.compactionTracker = tracker
}
func (s *scheduler) setDepth(level, depth int) {
level = level - 1
if level < 0 || level > len(s.queues) {
@ -33,10 +33,10 @@ func (s *scheduler) setDepth(level, depth int) {
}
func (s *scheduler) next() (int, bool) {
level1Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[0]))
level2Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[1]))
level3Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[2]))
level4Running := int(atomic.LoadInt64(&s.stats.TSMFullCompactionsActive) + atomic.LoadInt64(&s.stats.TSMOptimizeCompactionsActive))
level1Running := int(s.compactionTracker.Active(1))
level2Running := int(s.compactionTracker.Active(2))
level3Running := int(s.compactionTracker.Active(3))
level4Running := int(s.compactionTracker.ActiveFull() + s.compactionTracker.ActiveOptimise())
if level1Running+level2Running+level3Running+level4Running >= s.maxConcurrency {
return 0, false