diff --git a/cmd/influxd/launcher/remote_to_test.go b/cmd/influxd/launcher/remote_to_test.go index 38846a5708..9ea54cddb1 100644 --- a/cmd/influxd/launcher/remote_to_test.go +++ b/cmd/influxd/launcher/remote_to_test.go @@ -35,7 +35,6 @@ array.from(rows) |> to(bucket: "%s", host: "%s", token: "%s", org: "%s") `, l2.Bucket.Name, l2.URL().String(), l2.Auth.Token, l2.Org.Name) _ = l1.FluxQueryOrFail(t, l1.Org, l1.Auth.Token, q1) - // Query the 2nd server and check that the points landed. q2 := fmt.Sprintf(`from(bucket:"%s") |> range(start: 2030-01-01T00:00:00Z, stop: 2030-01-02T00:00:00Z) @@ -88,7 +87,6 @@ union(tables: [testTable, test2Table]) |> group(columns: ["_measurement"]) `, l2.Bucket.Name, l2.URL().String(), l2.Auth.Token, l2.Org.Name) _ = l1.FluxQueryOrFail(t, l1.Org, l1.Auth.Token, q1) - // Query the 2nd server and check that the points landed. q2 := fmt.Sprintf(`from(bucket:"%s") |> range(start: 2030-01-01T00:00:00Z, stop: 2030-01-02T00:00:00Z) diff --git a/models/statistic.go b/models/statistic.go index 553e9d09fb..9107d9025a 100644 --- a/models/statistic.go +++ b/models/statistic.go @@ -7,15 +7,6 @@ type Statistic struct { Values map[string]interface{} `json:"values"` } -// NewStatistic returns an initialized Statistic. -func NewStatistic(name string) Statistic { - return Statistic{ - Name: name, - Tags: make(map[string]string), - Values: make(map[string]interface{}), - } -} - // StatisticTags is a map that can be merged with others without causing // mutations to either map. type StatisticTags map[string]string diff --git a/query/stdlib/influxdata/influxdb/rules_test.go b/query/stdlib/influxdata/influxdb/rules_test.go index 0d99eb98bc..78bb87a9b1 100644 --- a/query/stdlib/influxdata/influxdb/rules_test.go +++ b/query/stdlib/influxdata/influxdb/rules_test.go @@ -492,7 +492,7 @@ func TestPushDownFilterRule(t *testing.T) { } func TestPushDownGroupRule(t *testing.T) { - createRangeSpec := func() *influxdb.ReadRangePhysSpec{ + createRangeSpec := func() *influxdb.ReadRangePhysSpec { return &influxdb.ReadRangePhysSpec{ Bucket: "my-bucket", Bounds: flux.Bounds{ @@ -2363,9 +2363,9 @@ func TestTransposeGroupToWindowAggregateRule(t *testing.T) { plan.CreateLogicalNode("group", group(flux.GroupModeBy, "host")), plan.CreateLogicalNode("window", &universe.WindowProcedureSpec{ Window: plan.WindowSpec{ - Every: dur2m, - Period: dur2m, - Offset: dur1m, + Every: dur2m, + Period: dur2m, + Offset: dur1m, Location: plan.Location{ Name: "UTC", }, diff --git a/storage/engine.go b/storage/engine.go index 4e7e627a76..51b4316190 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb" _ "github.com/influxdata/influxdb/v2/tsdb/engine" + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" _ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/influxdata/influxdb/v2/v1/coordinator" "github.com/influxdata/influxdb/v2/v1/services/meta" @@ -163,7 +164,9 @@ func (e *Engine) WithLogger(log *zap.Logger) { // PrometheusCollectors returns all the prometheus collectors associated with // the engine and its components. func (e *Engine) PrometheusCollectors() []prometheus.Collector { - return nil + var metrics []prometheus.Collector + metrics = append(metrics, tsm1.PrometheusCollectors()...) + return metrics } // Open opens the store and all underlying resources. It returns an error if diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index 77ba83ecee..ee5fe6a2b5 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -5,13 +5,12 @@ import ( "errors" "time" - "go.uber.org/zap" - "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/task/backend/executor" "github.com/influxdata/influxdb/v2/task/backend/middleware" "github.com/influxdata/influxdb/v2/task/backend/scheduler" "github.com/influxdata/influxdb/v2/task/taskmodel" + "go.uber.org/zap" ) var _ middleware.Coordinator = (*Coordinator)(nil) diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index f2e2804883..712ebac675 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -8,11 +8,10 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "go.uber.org/zap/zaptest" - "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/task/backend/scheduler" "github.com/influxdata/influxdb/v2/task/taskmodel" + "go.uber.org/zap/zaptest" ) func Test_Coordinator_Executor_Methods(t *testing.T) { diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 666a719671..56307b62e8 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -11,8 +11,6 @@ import ( "github.com/influxdata/flux/ast" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/runtime" - "go.uber.org/zap" - "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/kit/feature" @@ -22,6 +20,7 @@ import ( "github.com/influxdata/influxdb/v2/task/backend" "github.com/influxdata/influxdb/v2/task/backend/scheduler" "github.com/influxdata/influxdb/v2/task/taskmodel" + "go.uber.org/zap" ) const ( diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index cb33ca61f6..2c62e1f19c 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -12,12 +12,6 @@ import ( "github.com/golang/mock/gomock" "github.com/influxdata/flux" - "github.com/opentracing/opentracing-go" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber/jaeger-client-go" - "go.uber.org/zap/zaptest" - "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/authorization" icontext "github.com/influxdata/influxdb/v2/context" @@ -35,6 +29,11 @@ import ( "github.com/influxdata/influxdb/v2/task/backend/scheduler" "github.com/influxdata/influxdb/v2/task/taskmodel" "github.com/influxdata/influxdb/v2/tenant" + "github.com/opentracing/opentracing-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-client-go" + "go.uber.org/zap/zaptest" ) func TestMain(m *testing.M) { diff --git a/tsdb/engine.go b/tsdb/engine.go index 4ac396a9dc..004e1d362d 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -20,9 +20,6 @@ import ( ) var ( - // ErrFormatNotFound is returned when no format can be determined from a path. - ErrFormatNotFound = errors.New("format not found") - // ErrUnknownEngineFormat is returned when the engine format is // unknown. ErrUnknownEngineFormat is currently returned if a format // other than tsm1 is encountered. @@ -74,8 +71,6 @@ type Engine interface { MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) TagKeyCardinality(name, key []byte) int - // Statistics will return statistics relevant to this engine. - Statistics(tags map[string]string) []models.Statistic LastModified() time.Time DiskSize() int64 IsIdle() (bool, string) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index bc99ae3c39..a09494f186 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -34,6 +34,7 @@ import ( _ "github.com/influxdata/influxdb/v2/tsdb/index" "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/influxdata/influxql" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -70,19 +71,6 @@ var ( planningTimer = metrics.MustRegisterTimer("planning_time", metrics.WithGroup(tsmGroup)) ) -// NewContextWithMetricsGroup creates a new context with a tsm1 metrics.Group for tracking -// various metrics when accessing TSM data. -func NewContextWithMetricsGroup(ctx context.Context) context.Context { - group := metrics.NewGroup(tsmGroup) - return metrics.NewContextWithGroup(ctx, group) -} - -// MetricsGroupFromContext returns the tsm1 metrics.Group associated with the context -// or nil if no group has been assigned. -func MetricsGroupFromContext(ctx context.Context) *metrics.Group { - return metrics.GroupFromContext(ctx) -} - const ( // keyFieldSeparator separates the series key from the field name in the composite key // that identifies a specific field in series @@ -92,44 +80,6 @@ const ( deleteFlushThreshold = 50 * 1024 * 1024 ) -// Statistics gathered by the engine. -const ( - statCacheCompactions = "cacheCompactions" - statCacheCompactionsActive = "cacheCompactionsActive" - statCacheCompactionError = "cacheCompactionErr" - statCacheCompactionDuration = "cacheCompactionDuration" - - statTSMLevel1Compactions = "tsmLevel1Compactions" - statTSMLevel1CompactionsActive = "tsmLevel1CompactionsActive" - statTSMLevel1CompactionError = "tsmLevel1CompactionErr" - statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration" - statTSMLevel1CompactionQueue = "tsmLevel1CompactionQueue" - - statTSMLevel2Compactions = "tsmLevel2Compactions" - statTSMLevel2CompactionsActive = "tsmLevel2CompactionsActive" - statTSMLevel2CompactionError = "tsmLevel2CompactionErr" - statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration" - statTSMLevel2CompactionQueue = "tsmLevel2CompactionQueue" - - statTSMLevel3Compactions = "tsmLevel3Compactions" - statTSMLevel3CompactionsActive = "tsmLevel3CompactionsActive" - statTSMLevel3CompactionError = "tsmLevel3CompactionErr" - statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration" - statTSMLevel3CompactionQueue = "tsmLevel3CompactionQueue" - - statTSMOptimizeCompactions = "tsmOptimizeCompactions" - statTSMOptimizeCompactionsActive = "tsmOptimizeCompactionsActive" - statTSMOptimizeCompactionError = "tsmOptimizeCompactionErr" - statTSMOptimizeCompactionDuration = "tsmOptimizeCompactionDuration" - statTSMOptimizeCompactionQueue = "tsmOptimizeCompactionQueue" - - statTSMFullCompactions = "tsmFullCompactions" - statTSMFullCompactionsActive = "tsmFullCompactionsActive" - statTSMFullCompactionError = "tsmFullCompactionErr" - statTSMFullCompactionDuration = "tsmFullCompactionDuration" - statTSMFullCompactionQueue = "tsmFullCompactionQueue" -) - // Engine represents a storage engine with compressed blocks. type Engine struct { mu sync.RWMutex @@ -187,7 +137,9 @@ type Engine struct { // Controls whether to enabled compactions when the engine is open enableCompactionsOnOpen bool - stats *EngineStatistics + stats *compactionMetrics + + activeCompactions *compactionCounter // Limiter for concurrent compactions. compactionLimiter limiter.Fixed @@ -232,7 +184,14 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts planner.SetFileStore(fs) } - stats := &EngineStatistics{} + stats := newEngineMetrics(engineTags{ + path: path, + walPath: walPath, + id: fmt.Sprintf("%d", id), + bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db + engineVersion: opt.EngineVersion, + }) + activeCompactions := &compactionCounter{} e := &Engine{ id: id, path: path, @@ -249,6 +208,9 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts Compactor: c, CompactionPlan: planner, + activeCompactions: activeCompactions, + scheduler: newScheduler(activeCompactions, opt.CompactionLimiter.Capacity()), + CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize), CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), enableCompactionsOnOpen: true, @@ -256,7 +218,6 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts formatFileName: DefaultFormatFileName, stats: stats, compactionLimiter: opt.CompactionLimiter, - scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()), seriesIDSets: opt.SeriesIDSets, } @@ -627,82 +588,132 @@ func (e *Engine) LastModified() time.Time { return fsTime } -// EngineStatistics maintains statistics for the engine. -type EngineStatistics struct { - CacheCompactions int64 // Counter of cache compactions that have ever run. - CacheCompactionsActive int64 // Gauge of cache compactions currently running. - CacheCompactionErrors int64 // Counter of cache compactions that have failed due to error. - CacheCompactionDuration int64 // Counter of number of wall nanoseconds spent in cache compactions. +var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(engineLabelNames()) - TSMCompactions [3]int64 // Counter of TSM compactions (by level) that have ever run. - TSMCompactionsActive [3]int64 // Gauge of TSM compactions (by level) currently running. - TSMCompactionErrors [3]int64 // Counter of TSM compcations (by level) that have failed due to error. - TSMCompactionDuration [3]int64 // Counter of number of wall nanoseconds spent in TSM compactions (by level). - TSMCompactionsQueue [3]int64 // Gauge of TSM compactions queues (by level). - - TSMOptimizeCompactions int64 // Counter of optimize compactions that have ever run. - TSMOptimizeCompactionsActive int64 // Gauge of optimize compactions currently running. - TSMOptimizeCompactionErrors int64 // Counter of optimize compactions that have failed due to error. - TSMOptimizeCompactionDuration int64 // Counter of number of wall nanoseconds spent in optimize compactions. - TSMOptimizeCompactionsQueue int64 // Gauge of optimize compactions queue. - - TSMFullCompactions int64 // Counter of full compactions that have ever run. - TSMFullCompactionsActive int64 // Gauge of full compactions currently running. - TSMFullCompactionErrors int64 // Counter of full compactions that have failed due to error. - TSMFullCompactionDuration int64 // Counter of number of wall nanoseconds spent in full compactions. - TSMFullCompactionsQueue int64 // Gauge of full compactions queue. +// PrometheusCollectors returns all prometheus metrics for the tsm1 package. +func PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + globalCompactionMetrics.Duration, + globalCompactionMetrics.Active, + globalCompactionMetrics.Failed, + globalCompactionMetrics.Queued, + } } -// Statistics returns statistics for periodic monitoring. -func (e *Engine) Statistics(tags map[string]string) []models.Statistic { - statistics := make([]models.Statistic, 0, 4) - statistics = append(statistics, models.Statistic{ - Name: "tsm1_engine", - Tags: tags, - Values: map[string]interface{}{ - statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions), - statCacheCompactionsActive: atomic.LoadInt64(&e.stats.CacheCompactionsActive), - statCacheCompactionError: atomic.LoadInt64(&e.stats.CacheCompactionErrors), - statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration), +const ( + storageNamespace = "storage" + engineSubsystem = "compactions" + level1 = "1" + level2 = "2" + level3 = "3" + levelOpt = "opt" + levelFull = "full" + levelKey = "level" + levelCache = "cache" +) - statTSMLevel1Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[0]), - statTSMLevel1CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]), - statTSMLevel1CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[0]), - statTSMLevel1CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[0]), - statTSMLevel1CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[0]), - - statTSMLevel2Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[1]), - statTSMLevel2CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]), - statTSMLevel2CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[1]), - statTSMLevel2CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[1]), - statTSMLevel2CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[1]), - - statTSMLevel3Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[2]), - statTSMLevel3CompactionsActive: atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]), - statTSMLevel3CompactionError: atomic.LoadInt64(&e.stats.TSMCompactionErrors[2]), - statTSMLevel3CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[2]), - statTSMLevel3CompactionQueue: atomic.LoadInt64(&e.stats.TSMCompactionsQueue[2]), - - statTSMOptimizeCompactions: atomic.LoadInt64(&e.stats.TSMOptimizeCompactions), - statTSMOptimizeCompactionsActive: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive), - statTSMOptimizeCompactionError: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionErrors), - statTSMOptimizeCompactionDuration: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionDuration), - statTSMOptimizeCompactionQueue: atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsQueue), - - statTSMFullCompactions: atomic.LoadInt64(&e.stats.TSMFullCompactions), - statTSMFullCompactionsActive: atomic.LoadInt64(&e.stats.TSMFullCompactionsActive), - statTSMFullCompactionError: atomic.LoadInt64(&e.stats.TSMFullCompactionErrors), - statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration), - statTSMFullCompactionQueue: atomic.LoadInt64(&e.stats.TSMFullCompactionsQueue), - }, - }) - - statistics = append(statistics, e.Cache.Statistics(tags)...) - statistics = append(statistics, e.FileStore.Statistics(tags)...) - if e.WALEnabled { - statistics = append(statistics, e.WAL.Statistics(tags)...) +func labelForLevel(l int) prometheus.Labels { + switch l { + case 1: + return prometheus.Labels{levelKey: level1} + case 2: + return prometheus.Labels{levelKey: level2} + case 3: + return prometheus.Labels{levelKey: level3} + } + panic(fmt.Sprintf("labelForLevel: level out of range %d", l)) +} + +func newAllCompactionMetrics(labelNames []string) *compactionMetrics { + labelNamesWithLevel := append(labelNames, levelKey) + return &compactionMetrics{ + Duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: storageNamespace, + Subsystem: engineSubsystem, + Name: "duration_seconds", + Help: "Histogram of compactions by level since startup", + Buckets: []float64{0.1, 1, 10, 100, 1000, 10000, 100000}, + }, labelNamesWithLevel), + Active: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: engineSubsystem, + Name: "active", + Help: "Gauge of compactions (by level) currently running", + }, labelNamesWithLevel), + Failed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: engineSubsystem, + Name: "failed", + Help: "Counter of TSM compactions (by level) that have failed due to error", + }, labelNamesWithLevel), + Queued: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: engineSubsystem, + Name: "queued", + Help: "Counter of TSM compactions (by level) that are currently queued", + }, labelNamesWithLevel), + } +} + +type compactionCounter struct { + l1 int64 + l2 int64 + l3 int64 + full int64 + optimize int64 +} + +func (c *compactionCounter) countForLevel(l int) *int64 { + switch l { + case 1: + return &c.l1 + case 2: + return &c.l2 + case 3: + return &c.l3 + } + return nil +} + +// engineMetrics holds statistics across all instantiated engines +type compactionMetrics struct { + Duration prometheus.ObserverVec + Active *prometheus.GaugeVec + Queued *prometheus.GaugeVec + Failed *prometheus.CounterVec +} + +type engineTags struct { + path, walPath, id, bucket, engineVersion string +} + +func (et *engineTags) getLabels() prometheus.Labels { + return prometheus.Labels{ + "path": et.path, + "walPath": et.walPath, + "id": et.id, + "bucket": et.bucket, + "engine": et.engineVersion, + } +} + +func engineLabelNames() []string { + emptyLabels := (&engineTags{}).getLabels() + val := make([]string, 0, len(emptyLabels)) + for k := range emptyLabels { + val = append(val, k) + } + return val +} + +func newEngineMetrics(tags engineTags) *compactionMetrics { + engineLabels := tags.getLabels() + return &compactionMetrics{ + Duration: globalCompactionMetrics.Duration.MustCurryWith(engineLabels), + Active: globalCompactionMetrics.Active.MustCurryWith(engineLabels), + Failed: globalCompactionMetrics.Failed.MustCurryWith(engineLabels), + Queued: globalCompactionMetrics.Queued.MustCurryWith(engineLabels), } - return statistics } // DiskSize returns the total size in bytes of all TSM and WAL segments on disk. @@ -888,12 +899,12 @@ func (e *Engine) IsIdle() (state bool, reason string) { ActiveCompactions *int64 LogMessage string }{ - {&e.stats.CacheCompactionsActive, "not idle because of active Cache compactions"}, - {&e.stats.TSMCompactionsActive[0], "not idle because of active Level Zero compactions"}, - {&e.stats.TSMCompactionsActive[1], "not idle because of active Level One compactions"}, - {&e.stats.TSMCompactionsActive[2], "not idle because of active Level Two compactions"}, - {&e.stats.TSMFullCompactionsActive, "not idle because of active Full compactions"}, - {&e.stats.TSMOptimizeCompactionsActive, "not idle because of active TSM Optimization compactions"}, + // We don't actually track cache compactions: {&e.status.CacheCompactionsActive, "not idle because of active Cache compactions"}, + {&e.activeCompactions.l1, "not idle because of active Level1 compactions"}, + {&e.activeCompactions.l2, "not idle because of active Level2 compactions"}, + {&e.activeCompactions.l3, "not idle because of active Level3 compactions"}, + {&e.activeCompactions.full, "not idle because of active Full compactions"}, + {&e.activeCompactions.optimize, "not idle because of active TSM Optimization compactions"}, } for _, compactionState := range c { @@ -1970,11 +1981,9 @@ func (e *Engine) compactCache() { err := e.WriteSnapshot() if err != nil && err != errCompactionsDisabled { e.logger.Info("Error writing snapshot", zap.Error(err)) - atomic.AddInt64(&e.stats.CacheCompactionErrors, 1) - } else { - atomic.AddInt64(&e.stats.CacheCompactions, 1) + e.stats.Failed.With(prometheus.Labels{levelKey: levelCache}).Inc() } - atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds()) + e.stats.Duration.With(prometheus.Labels{levelKey: levelCache}).Observe(time.Since(start).Seconds()) } } } @@ -2016,20 +2025,21 @@ func (e *Engine) compact(wg *sync.WaitGroup) { level2Groups, len2 := e.CompactionPlan.PlanLevel(2) level3Groups, len3 := e.CompactionPlan.PlanLevel(3) level4Groups, len4 := e.CompactionPlan.Plan(e.LastModified()) - atomic.StoreInt64(&e.stats.TSMFullCompactionsQueue, len4) + + e.stats.Queued.With(prometheus.Labels{levelKey: levelFull}).Set(float64(len4)) // If no full compactions are need, see if an optimize is needed if len(level4Groups) == 0 { level4Groups, len4 = e.CompactionPlan.PlanOptimize() - atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4) + e.stats.Queued.With(prometheus.Labels{levelKey: levelOpt}).Set(float64(len4)) } // Update the level plan queue stats // For stats, use the length needed, even if the lock was // not acquired - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], len1) - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], len2) - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], len3) + e.stats.Queued.With(prometheus.Labels{levelKey: level1}).Set(float64(len1)) + e.stats.Queued.With(prometheus.Labels{levelKey: level2}).Set(float64(len2)) + e.stats.Queued.With(prometheus.Labels{levelKey: level3}).Set(float64(len3)) // Set the queue depths on the scheduler // Use the real queue depth, dependent on acquiring @@ -2079,12 +2089,18 @@ func (e *Engine) compactLevel(grp CompactionGroup, level int, fast bool, wg *syn } if e.compactionLimiter.TryTake() { - atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1) + { + val := atomic.AddInt64(e.activeCompactions.countForLevel(level), 1) + e.stats.Active.With(labelForLevel(level)).Set(float64(val)) + } wg.Add(1) go func() { defer wg.Done() - defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1) + defer func() { + val := atomic.AddInt64(e.activeCompactions.countForLevel(level), -1) + e.stats.Active.With(labelForLevel(level)).Set(float64(val)) + }() defer e.compactionLimiter.Release() s.Apply() @@ -2108,11 +2124,17 @@ func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool { // Try the lo priority limiter, otherwise steal a little from the high priority if we can. if e.compactionLimiter.TryTake() { - atomic.AddInt64(&e.stats.TSMFullCompactionsActive, 1) + { + val := atomic.AddInt64(&e.activeCompactions.full, 1) + e.stats.Active.With(prometheus.Labels{levelKey: levelFull}).Set(float64(val)) + } wg.Add(1) go func() { defer wg.Done() - defer atomic.AddInt64(&e.stats.TSMFullCompactionsActive, -1) + defer func() { + val := atomic.AddInt64(&e.activeCompactions.full, -1) + e.stats.Active.With(prometheus.Labels{levelKey: levelFull}).Set(float64(val)) + }() defer e.compactionLimiter.Release() s.Apply() // Release the files in the compaction plan @@ -2130,10 +2152,8 @@ type compactionStrategy struct { fast bool level int - durationStat *int64 - activeStat *int64 - successStat *int64 - errorStat *int64 + durationSecondsStat prometheus.Observer + errorStat prometheus.Counter logger *zap.Logger compactor *Compactor @@ -2146,7 +2166,7 @@ type compactionStrategy struct { func (s *compactionStrategy) Apply() { start := time.Now() s.compactGroup() - atomic.AddInt64(s.durationStat, time.Since(start).Nanoseconds()) + s.durationSecondsStat.Observe(time.Since(start).Seconds()) } // compactGroup executes the compaction strategy against a single CompactionGroup. @@ -2194,14 +2214,14 @@ func (s *compactionStrategy) compactGroup() { } } - atomic.AddInt64(s.errorStat, 1) + s.errorStat.Inc() time.Sleep(time.Second) return } if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil { log.Info("Error replacing new TSM files", zap.Error(err)) - atomic.AddInt64(s.errorStat, 1) + s.errorStat.Inc() time.Sleep(time.Second) // Remove the new snapshot files. We will try again. @@ -2218,12 +2238,12 @@ func (s *compactionStrategy) compactGroup() { } log.Info("Finished compacting files", zap.Int("tsm1_files_n", len(files))) - atomic.AddInt64(s.successStat, 1) } // levelCompactionStrategy returns a compactionStrategy for the given level. // It returns nil if there are no TSM files to compact. func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy { + label := labelForLevel(level) return &compactionStrategy{ group: group, logger: e.logger.With(zap.Int("tsm1_level", level), zap.String("tsm1_strategy", "level")), @@ -2233,10 +2253,8 @@ func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level engine: e, level: level, - activeStat: &e.stats.TSMCompactionsActive[level-1], - successStat: &e.stats.TSMCompactions[level-1], - errorStat: &e.stats.TSMCompactionErrors[level-1], - durationStat: &e.stats.TSMCompactionDuration[level-1], + errorStat: e.stats.Failed.With(label), + durationSecondsStat: e.stats.Duration.With(label), } } @@ -2253,18 +2271,12 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c level: 4, } + plabel := prometheus.Labels{levelKey: levelFull} if optimize { - s.activeStat = &e.stats.TSMOptimizeCompactionsActive - s.successStat = &e.stats.TSMOptimizeCompactions - s.errorStat = &e.stats.TSMOptimizeCompactionErrors - s.durationStat = &e.stats.TSMOptimizeCompactionDuration - } else { - s.activeStat = &e.stats.TSMFullCompactionsActive - s.successStat = &e.stats.TSMFullCompactions - s.errorStat = &e.stats.TSMFullCompactionErrors - s.durationStat = &e.stats.TSMFullCompactionDuration + plabel = prometheus.Labels{levelKey: levelOpt} } - + s.errorStat = e.stats.Failed.With(plabel) + s.durationSecondsStat = e.stats.Duration.With(plabel) return s } diff --git a/tsdb/engine/tsm1/scheduler.go b/tsdb/engine/tsm1/scheduler.go index d360afc3e7..774a899806 100644 --- a/tsdb/engine/tsm1/scheduler.go +++ b/tsdb/engine/tsm1/scheduler.go @@ -7,19 +7,19 @@ import ( var defaultWeights = [4]float64{0.4, 0.3, 0.2, 0.1} type scheduler struct { - maxConcurrency int - stats *EngineStatistics + maxConcurrency int + activeCompactions *compactionCounter // queues is the depth of work pending for each compaction level queues [4]int weights [4]float64 } -func newScheduler(stats *EngineStatistics, maxConcurrency int) *scheduler { +func newScheduler(activeCompactions *compactionCounter, maxConcurrency int) *scheduler { return &scheduler{ - stats: stats, - maxConcurrency: maxConcurrency, - weights: defaultWeights, + activeCompactions: activeCompactions, + maxConcurrency: maxConcurrency, + weights: defaultWeights, } } @@ -33,10 +33,10 @@ func (s *scheduler) setDepth(level, depth int) { } func (s *scheduler) next() (int, bool) { - level1Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[0])) - level2Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[1])) - level3Running := int(atomic.LoadInt64(&s.stats.TSMCompactionsActive[2])) - level4Running := int(atomic.LoadInt64(&s.stats.TSMFullCompactionsActive) + atomic.LoadInt64(&s.stats.TSMOptimizeCompactionsActive)) + level1Running := int(atomic.LoadInt64(&s.activeCompactions.l1)) + level2Running := int(atomic.LoadInt64(&s.activeCompactions.l2)) + level3Running := int(atomic.LoadInt64(&s.activeCompactions.l3)) + level4Running := int(atomic.LoadInt64(&s.activeCompactions.full) + atomic.LoadInt64(&s.activeCompactions.optimize)) if level1Running+level2Running+level3Running+level4Running >= s.maxConcurrency { return 0, false diff --git a/tsdb/engine/tsm1/scheduler_test.go b/tsdb/engine/tsm1/scheduler_test.go index 9ff40b0e5f..8d14dd3123 100644 --- a/tsdb/engine/tsm1/scheduler_test.go +++ b/tsdb/engine/tsm1/scheduler_test.go @@ -3,7 +3,7 @@ package tsm1 import "testing" func TestScheduler_Runnable_Empty(t *testing.T) { - s := newScheduler(&EngineStatistics{}, 1) + s := newScheduler(&compactionCounter{}, 1) for i := 1; i < 5; i++ { s.setDepth(i, 1) @@ -20,11 +20,11 @@ func TestScheduler_Runnable_Empty(t *testing.T) { } func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { - s := newScheduler(&EngineStatistics{}, 1) + s := newScheduler(&compactionCounter{}, 1) // level 1 - s.stats = &EngineStatistics{} - s.stats.TSMCompactionsActive[0] = 1 + s.activeCompactions = &compactionCounter{} + s.activeCompactions.l1 = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -33,8 +33,8 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // level 2 - s.stats = &EngineStatistics{} - s.stats.TSMCompactionsActive[1] = 1 + s.activeCompactions = &compactionCounter{} + s.activeCompactions.l2 = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -43,8 +43,8 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // level 3 - s.stats = &EngineStatistics{} - s.stats.TSMCompactionsActive[2] = 1 + s.activeCompactions = &compactionCounter{} + s.activeCompactions.l3 = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -53,8 +53,8 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // optimize - s.stats = &EngineStatistics{} - s.stats.TSMOptimizeCompactionsActive++ + s.activeCompactions = &compactionCounter{} + s.activeCompactions.optimize++ for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -63,8 +63,8 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // full - s.stats = &EngineStatistics{} - s.stats.TSMFullCompactionsActive++ + s.activeCompactions = &compactionCounter{} + s.activeCompactions.full++ for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { diff --git a/tsdb/shard.go b/tsdb/shard.go index cf3cd750f7..d85c20b098 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -286,8 +286,6 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { }, }} - // Add the index and engine statistics. - statistics = append(statistics, engine.Statistics(tags)...) return statistics } diff --git a/tsdb/store.go b/tsdb/store.go index fe931e0422..7df7e679f5 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -159,7 +159,7 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { }) } - // Gather allĀ statistics for all shards. + // Gather all statistics for all shards. for _, shard := range shards { statistics = append(statistics, shard.Statistics(tags)...) }