From d61b9f16458d64d7876551801dd4a4e4bb2b42ca Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 31 Oct 2018 18:36:22 +0000 Subject: [PATCH] Convert Filestore stats --- tsdb/engine.go | 126 ---------------------------------------- tsdb/tsm1/engine.go | 25 ++++---- tsdb/tsm1/file_store.go | 93 ++++++++++++++++++----------- tsdb/tsm1/metrics.go | 117 ++++++++++++++++++++++++++++++------- 4 files changed, 168 insertions(+), 193 deletions(-) delete mode 100644 tsdb/engine.go diff --git a/tsdb/engine.go b/tsdb/engine.go deleted file mode 100644 index c5e8aeccff..0000000000 --- a/tsdb/engine.go +++ /dev/null @@ -1,126 +0,0 @@ -package tsdb - -import ( - "context" - "errors" - "regexp" - "runtime" - "time" - - "github.com/influxdata/influxql" - "github.com/influxdata/platform/models" - "github.com/influxdata/platform/pkg/limiter" - "go.uber.org/zap" -) - -var ( - // ErrUnknownEngineFormat is returned when the engine format is - // unknown. ErrUnknownEngineFormat is currently returned if a format - // other than tsm1 is encountered. - ErrUnknownEngineFormat = errors.New("unknown engine format") -) - -// Engine represents a swappable storage engine for the shard. -type Engine interface { - Open() error - Close() error - SetEnabled(enabled bool) - SetCompactionsEnabled(enabled bool) - ScheduleFullCompaction() error - - WithLogger(*zap.Logger) - - CreateCursorIterator(ctx context.Context) (CursorIterator, error) - WritePoints(points []models.Point) error - - CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error - CreateSeriesListIfNotExists(collection *SeriesCollection) error - DeleteSeriesRange(itr SeriesIterator, min, max int64) error - DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error - - SeriesN() int64 - - MeasurementExists(name []byte) (bool, error) - - MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) - ForEachMeasurementName(fn func(name []byte) error) error - DeleteMeasurement(name []byte) error - - HasTagKey(name, key []byte) (bool, error) - MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) - TagKeyCardinality(name, key []byte) int - - LastModified() time.Time - DiskSize() int64 - IsIdle() bool - Free() error -} - -// SeriesIDSets provides access to the total set of series IDs -type SeriesIDSets interface { - ForEach(f func(ids *SeriesIDSet)) error -} - -// EngineOptions represents the options used to initialize the engine. -type EngineOptions struct { - EngineVersion string - ShardID uint64 - - // Limits the concurrent number of TSM files that can be loaded at once. - OpenLimiter limiter.Fixed - - // CompactionDisabled specifies shards should not schedule compactions. - // This option is intended for offline tooling. - CompactionDisabled bool - CompactionPlannerCreator CompactionPlannerCreator - CompactionLimiter limiter.Fixed - CompactionThroughputLimiter limiter.Rate - WALEnabled bool - MonitorDisabled bool - - // DatabaseFilter is a predicate controlling which databases may be opened. - // If no function is set, all databases will be opened. - DatabaseFilter func(database string) bool - - // RetentionPolicyFilter is a predicate controlling which combination of database and retention policy may be opened. - // nil will allow all combinations to pass. - RetentionPolicyFilter func(database, rp string) bool - - // ShardFilter is a predicate controlling which combination of database, retention policy and shard group may be opened. - // nil will allow all combinations to pass. - ShardFilter func(database, rp string, id uint64) bool - - Config Config - SeriesIDSets SeriesIDSets - - OnNewEngine func(Engine) - - FileStoreObserver FileStoreObserver -} - -// NewEngineOptions constructs an EngineOptions object with safe default values. -// This should only be used in tests; production environments should read from a config file. -func NewEngineOptions() EngineOptions { - return EngineOptions{ - EngineVersion: DefaultEngine, - Config: NewConfig(), - OpenLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)), - CompactionLimiter: limiter.NewFixed(1), - WALEnabled: false, - } -} - -// NewInmemIndex returns a new "inmem" index type. -var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error) - -type CompactionPlannerCreator func(cfg Config) interface{} - -// FileStoreObserver is passed notifications before the file store adds or deletes files. In this way, it can -// be sure to observe every file that is added or removed even in the presence of process death. -type FileStoreObserver interface { - // FileFinishing is called before a file is renamed to it's final name. - FileFinishing(path string) error - - // FileUnlinking is called before a file is unlinked. - FileUnlinking(path string) error -} diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 69df60743a..50096ca0a5 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -221,7 +221,7 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt enableCompactionsOnOpen: true, formatFileName: DefaultFormatFileName, compactionLimiter: limiter.NewFixed(maxCompactions), - scheduler: newScheduler(stats, maxCompactions), + scheduler: newScheduler(maxCompactions), } for _, option := range options { @@ -497,7 +497,11 @@ func (e *Engine) DiskSize() int64 { func (e *Engine) Open() error { // Initialise metrics... e.blockMetrics = newBlockMetrics(e.defaultMetricLabels) - e.compactionTracker = newCompactionTracker(e.blockMetrics) + + // Propagate prometheus metrics down into trackers. + e.compactionTracker = newCompactionTracker(e.blockMetrics.compactionMetrics) + e.FileStore.fileTracker = newFileTracker(e.blockMetrics.fileMetrics) + e.scheduler.setCompactionTracker(e.compactionTracker) if err := os.MkdirAll(e.path, 0777); err != nil { @@ -550,7 +554,6 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector { var metrics []prometheus.Collector metrics = append(metrics, e.blockMetrics.PrometheusCollectors()...) - // TODO(edd): Add Filestore metrics // TODO(edd): Add Cache metrics // TODO(edd): Add WAL metrics return metrics @@ -1035,7 +1038,7 @@ func (l compactionLevel) String() string { // *NOTE* - compactionTracker fields should not be directory modified. Doing so // could result in the Engine exposing inaccurate metrics. type compactionTracker struct { - metrics *blockMetrics + metrics *compactionMetrics // Note: Compactions are levelled as follows: // 0 – Snapshots @@ -1049,8 +1052,8 @@ type compactionTracker struct { queue [6]uint64 // Gauge of TSM compactions queues (by level). } -func newCompactionTracker(blockMetrics *blockMetrics) *compactionTracker { - return &compactionTracker{metrics: blockMetrics} +func newCompactionTracker(metrics *compactionMetrics) *compactionTracker { + return &compactionTracker{metrics: metrics} } // Completed returns the total number of compactions for the provided level. @@ -1090,7 +1093,7 @@ func (t *compactionTracker) Errors(level int) uint64 { return atomic.LoadUint64( func (t *compactionTracker) IncActive(level compactionLevel) { atomic.AddUint64(&t.active[level], 1) - labels := t.metrics.CompactionLabels(level) + labels := t.metrics.Labels(level) t.metrics.CompactionsActive.With(labels).Inc() } @@ -1101,7 +1104,7 @@ func (t *compactionTracker) IncFullActive() { t.IncActive(5) } func (t *compactionTracker) DecActive(level compactionLevel) { atomic.AddUint64(&t.active[level], ^uint64(0)) - labels := t.metrics.CompactionLabels(level) + labels := t.metrics.Labels(level) t.metrics.CompactionsActive.With(labels).Dec() } @@ -1113,7 +1116,7 @@ func (t *compactionTracker) Attempted(level compactionLevel, success bool, durat if success { atomic.AddUint64(&t.ok[level], 1) - labels := t.metrics.CompactionLabels(level) + labels := t.metrics.Labels(level) t.metrics.CompactionDuration.With(labels).Observe(duration.Seconds()) labels["status"] = "ok" @@ -1123,7 +1126,7 @@ func (t *compactionTracker) Attempted(level compactionLevel, success bool, durat atomic.AddUint64(&t.errors[level], 1) - labels := t.metrics.CompactionLabels(level) + labels := t.metrics.Labels(level) labels["status"] = "error" t.metrics.Compactions.With(labels).Inc() } @@ -1137,7 +1140,7 @@ func (t *compactionTracker) SnapshotAttempted(success bool, duration time.Durati func (t *compactionTracker) SetQueue(level compactionLevel, length uint64) { atomic.StoreUint64(&t.queue[level], length) - labels := t.metrics.CompactionLabels(level) + labels := t.metrics.Labels(level) t.metrics.CompactionQueue.With(labels).Set(float64(length)) } diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 8934949731..e25afba80e 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -17,7 +17,6 @@ import ( "sync/atomic" "time" - "github.com/influxdata/platform/models" "github.com/influxdata/platform/pkg/file" "github.com/influxdata/platform/pkg/limiter" "github.com/influxdata/platform/pkg/metrics" @@ -160,12 +159,6 @@ type FileStoreObserver interface { FileUnlinking(path string) error } -// Statistics gathered by the FileStore. -const ( - statFileStoreBytes = "diskBytes" - statFileStoreCount = "numFiles" -) - var ( floatBlocksDecodedCounter = metrics.MustRegisterCounter("float_blocks_decoded", metrics.WithGroup(tsmGroup)) floatBlocksSizeCounter = metrics.MustRegisterCounter("float_blocks_size_bytes", metrics.WithGroup(tsmGroup)) @@ -198,8 +191,8 @@ type FileStore struct { traceLogger *zap.Logger // Logger to be used when trace-logging is on. traceLogging bool - stats *FileStoreStatistics - purger *purger + fileTracker *fileTracker + purger *purger currentTempDirID int @@ -242,7 +235,6 @@ func NewFileStore(dir string) *FileStore { logger: logger, traceLogger: logger, openLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)), - stats: &FileStoreStatistics{}, purger: &purger{ files: map[string]TSMFile{}, logger: logger, @@ -290,20 +282,53 @@ func (f *FileStore) WithLogger(log *zap.Logger) { // FileStoreStatistics keeps statistics about the file store. type FileStoreStatistics struct { - DiskBytes int64 - FileCount int64 + SDiskBytes int64 + SFileCount int64 } -// Statistics returns statistics for periodic monitoring. -func (f *FileStore) Statistics(tags map[string]string) []models.Statistic { - return []models.Statistic{{ - Name: "tsm1_filestore", - Tags: tags, - Values: map[string]interface{}{ - statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes), - statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount), - }, - }} +// fileTracker tracks file counts and sizes within the FileStore. +// +// As well as being responsible for providing atomic reads and writes to the +// statistics, fileTracker also mirrors any changes to the external prometheus +// metrics, which the Engine exposes. +// +// *NOTE* - fileTracker fields should not be directory modified. Doing so +// could result in the Engine exposing inaccurate metrics. +type fileTracker struct { + metrics *fileMetrics + diskBytes uint64 + fileCount uint64 +} + +func newFileTracker(metrics *fileMetrics) *fileTracker { + return &fileTracker{metrics: metrics} +} + +// Bytes returns the number of bytes in use on disk. +func (t *fileTracker) Bytes() uint64 { return atomic.LoadUint64(&t.diskBytes) } + +// SetBytes sets the number of bytes in use on disk. +func (t *fileTracker) SetBytes(bytes uint64) { + atomic.StoreUint64(&t.diskBytes, bytes) + + labels := t.metrics.Labels() + t.metrics.DiskSize.With(labels).Set(float64(bytes)) +} + +// AddBytes increases the number of bytes. +func (t *fileTracker) AddBytes(bytes uint64) { + atomic.AddUint64(&t.diskBytes, bytes) + + labels := t.metrics.Labels() + t.metrics.DiskSize.With(labels).Add(float64(bytes)) +} + +// SetFileCount sets the number of files in the FileStore. +func (t *fileTracker) SetFileCount(files uint64) { + atomic.StoreUint64(&t.fileCount, files) + + labels := t.metrics.Labels() + t.metrics.Files.With(labels).Set(float64(files)) } // Count returns the number of TSM files currently loaded. @@ -581,10 +606,11 @@ func (f *FileStore) Open() error { f.files = append(f.files, res.r) // Accumulate file store size stats - atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + totalSize := uint64(res.r.Size()) for _, ts := range res.r.TombstoneFiles() { - atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size)) + totalSize += uint64(ts.Size) } + f.fileTracker.AddBytes(totalSize) // Re-initialize the lastModified time for the file store if res.r.LastModified() > lm { @@ -596,7 +622,7 @@ func (f *FileStore) Open() error { close(readerC) sort.Sort(tsmReaders(f.files)) - atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files))) + f.fileTracker.SetFileCount(uint64(len(f.files))) return nil } @@ -609,7 +635,7 @@ func (f *FileStore) Close() error { f.lastFileStats = nil f.files = nil - atomic.StoreInt64(&f.stats.FileCount, 0) + f.fileTracker.SetFileCount(uint64(0)) // Let other methods access this closed object while we do the actual closing. f.mu.Unlock() @@ -624,9 +650,8 @@ func (f *FileStore) Close() error { return nil } -func (f *FileStore) DiskSizeBytes() int64 { - return atomic.LoadInt64(&f.stats.DiskBytes) -} +// DiskSizeBytes returns the total number of bytes consumed by the files in the FileStore. +func (f *FileStore) DiskSizeBytes() int64 { return int64(f.fileTracker.Bytes()) } // Read returns the slice of values for the given key and the given timestamp, // if any file matches those constraints. @@ -878,18 +903,18 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF f.lastFileStats = nil f.files = active sort.Sort(tsmReaders(f.files)) - atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files))) + f.fileTracker.SetFileCount(uint64(len(f.files))) // Recalculate the disk size stat - var totalSize int64 + var totalSize uint64 for _, file := range f.files { - totalSize += int64(file.Size()) + totalSize += uint64(file.Size()) for _, ts := range file.TombstoneFiles() { - totalSize += int64(ts.Size) + totalSize += uint64(ts.Size) } } - atomic.StoreInt64(&f.stats.DiskBytes, totalSize) + f.fileTracker.SetBytes(totalSize) return nil } diff --git a/tsdb/tsm1/metrics.go b/tsdb/tsm1/metrics.go index 6f64ce95cb..6a90c9990c 100644 --- a/tsdb/tsm1/metrics.go +++ b/tsdb/tsm1/metrics.go @@ -10,20 +10,44 @@ import ( // namespace is the leading part of all published metrics for the Storage service. const namespace = "storage" -const blockSubsystem = "block" // sub-system associated with metrics for block storage. +const compactionSubsystem = "compactions" // sub-system associated with metrics for compactions +const fileStoreSubsystem = "tsm_files" // sub-system associated with metrics for compactions // blockMetrics are a set of metrics concerned with tracking data about block storage. type blockMetrics struct { - labels prometheus.Labels // Read only. + labels prometheus.Labels + *compactionMetrics + *fileMetrics +} +// newBlockMetrics initialises the prometheus metrics for the block subsystem. +func newBlockMetrics(labels prometheus.Labels) *blockMetrics { + return &blockMetrics{ + labels: labels, + compactionMetrics: newCompactionMetrics(labels), + fileMetrics: newFileMetrics(labels), + } +} + +// PrometheusCollectors satisfies the prom.PrometheusCollector interface. +func (m *blockMetrics) PrometheusCollectors() []prometheus.Collector { + var metrics []prometheus.Collector + metrics = append(metrics, m.compactionMetrics.PrometheusCollectors()...) + metrics = append(metrics, m.fileMetrics.PrometheusCollectors()...) + return metrics +} + +// compactionMetrics are a set of metrics concerned with tracking data about compactions. +type compactionMetrics struct { + labels prometheus.Labels // Read Only Compactions *prometheus.CounterVec CompactionsActive *prometheus.GaugeVec CompactionDuration *prometheus.HistogramVec CompactionQueue *prometheus.GaugeVec } -// newBlockMetrics initialises the prometheus metrics for the block subsystem. -func newBlockMetrics(labels prometheus.Labels) *blockMetrics { +// newCompactionMetrics initialises the prometheus metrics for compactions. +func newCompactionMetrics(labels prometheus.Labels) *compactionMetrics { compactionNames := []string{"level"} // All compaction metrics have a `level` label. for k := range labels { compactionNames = append(compactionNames, k) @@ -32,41 +56,41 @@ func newBlockMetrics(labels prometheus.Labels) *blockMetrics { totalCompactionsNames := append(compactionNames, "status") sort.Strings(totalCompactionsNames) - return &blockMetrics{ + return &compactionMetrics{ labels: labels, Compactions: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, - Subsystem: blockSubsystem, - Name: "compactions_total", + Subsystem: compactionSubsystem, + Name: "total", Help: "Number of times cache snapshotted or TSM compaction attempted.", }, totalCompactionsNames), CompactionsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, - Subsystem: blockSubsystem, - Name: "compactions_active", + Subsystem: compactionSubsystem, + Name: "active", Help: "Number of active compactions.", }, compactionNames), CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, - Subsystem: blockSubsystem, - Name: "compaction_duration_seconds", + Subsystem: compactionSubsystem, + Name: "duration_seconds", Help: "Time taken for a successful compaction or snapshot.", // 30 buckets spaced exponentially between 5s and ~53 minutes. Buckets: prometheus.ExponentialBuckets(5.0, 1.25, 30), }, compactionNames), CompactionQueue: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, - Subsystem: blockSubsystem, - Name: "compactions_queued", + Subsystem: compactionSubsystem, + Name: "queued", Help: "Number of queued compactions.", }, compactionNames), } } -// CompactionLabels returns a copy of labels for use with compaction metrics. -func (b *blockMetrics) CompactionLabels(level compactionLevel) prometheus.Labels { - l := make(map[string]string, len(b.labels)) - for k, v := range b.labels { +// Labels returns a copy of labels for use with compaction metrics. +func (m *compactionMetrics) Labels(level compactionLevel) prometheus.Labels { + l := make(map[string]string, len(m.labels)) + for k, v := range m.labels { l[k] = v } l["level"] = fmt.Sprint(level) @@ -74,11 +98,60 @@ func (b *blockMetrics) CompactionLabels(level compactionLevel) prometheus.Labels } // PrometheusCollectors satisfies the prom.PrometheusCollector interface. -func (b *blockMetrics) PrometheusCollectors() []prometheus.Collector { +func (m *compactionMetrics) PrometheusCollectors() []prometheus.Collector { return []prometheus.Collector{ - b.Compactions, - b.CompactionsActive, - b.CompactionDuration, - b.CompactionQueue, + m.Compactions, + m.CompactionsActive, + m.CompactionDuration, + m.CompactionQueue, + } +} + +// fileMetrics are a set of metrics concerned with tracking data about compactions. +type fileMetrics struct { + labels prometheus.Labels + DiskSize *prometheus.GaugeVec + Files *prometheus.GaugeVec +} + +// newFileMetrics initialises the prometheus metrics for tracking files on disk. +func newFileMetrics(labels prometheus.Labels) *fileMetrics { + var names []string + for k := range labels { + names = append(names, k) + } + sort.Strings(names) + + return &fileMetrics{ + labels: labels, + DiskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: fileStoreSubsystem, + Name: "disk_bytes", + Help: "Number of bytes TSM files using on disk.", + }, names), + Files: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: fileStoreSubsystem, + Name: "total", + Help: "Number of files.", + }, names), + } +} + +// Labels returns a copy of labels for use with file metrics. +func (m *fileMetrics) Labels() prometheus.Labels { + l := make(map[string]string, len(m.labels)) + for k, v := range m.labels { + l[k] = v + } + return l +} + +// PrometheusCollectors satisfies the prom.PrometheusCollector interface. +func (m *fileMetrics) PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + m.DiskSize, + m.Files, } }