From 3b980ed7e35e7a8985fcb862156901dbbd89fe40 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 1 Nov 2018 18:58:56 +0000 Subject: [PATCH] Convert Cache statistics --- tsdb/tsm1/cache.go | 330 ++++++++++++++++++++++-------------- tsdb/tsm1/cache_test.go | 14 +- tsdb/tsm1/engine.go | 3 +- tsdb/tsm1/file_store.go | 1 + tsdb/tsm1/metrics.go | 110 +++++++++++- tsdb/tsm1/scheduler.go | 5 +- tsdb/tsm1/scheduler_test.go | 19 +-- 7 files changed, 325 insertions(+), 157 deletions(-) diff --git a/tsdb/tsm1/cache.go b/tsdb/tsm1/cache.go index a075ed21e8..455c38d7b6 100644 --- a/tsdb/tsm1/cache.go +++ b/tsdb/tsm1/cache.go @@ -143,25 +143,6 @@ func (e *entry) InfluxQLType() (influxql.DataType, error) { return e.values.InfluxQLType() } -// Statistics gathered by the Cache. -const ( - // levels - point in time measures - - statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes - statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes - statSnapshots = "snapshotCount" // level: Number of active snapshots. - statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time - - // counters - accumulative measures - - statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots. - statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots - - statCacheWriteOK = "writeOk" - statCacheWriteErr = "writeErr" - statCacheWriteDropped = "writeDropped" -) - // storer is the interface that descibes a cache's store. type storer interface { entry(key []byte) *entry // Get an entry by its key. @@ -178,12 +159,7 @@ type storer interface { // Cache maintains an in-memory store of Values for a set of keys. type Cache struct { - // Due to a bug in atomic size needs to be the first word in the struct, as - // that's the only place where you're guaranteed to be 64-bit aligned on a - // 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG - size uint64 - snapshotSize uint64 - + _ uint64 // Padding for 32 bit struct alignment mu sync.RWMutex store storer maxSize uint64 @@ -194,10 +170,7 @@ type Cache struct { snapshot *Cache snapshotting bool - // This number is the number of pending or failed WriteSnaphot attempts since the last successful one. - snapshotAttempts int - - stats *CacheStatistics + cacheTracker *cacheTracker lastSnapshot time.Time lastWriteTime time.Time @@ -213,50 +186,13 @@ func NewCache(maxSize uint64) *Cache { c := &Cache{ maxSize: maxSize, store: emptyStore{}, - stats: &CacheStatistics{}, lastSnapshot: time.Now(), + cacheTracker: newCacheTracker(newCacheMetrics(nil)), } c.initialize.Store(&sync.Once{}) - c.UpdateAge() - c.UpdateCompactTime(0) - c.updateCachedBytes(0) - c.updateMemSize(0) - c.updateSnapshots() return c } -// CacheStatistics hold statistics related to the cache. -type CacheStatistics struct { - MemSizeBytes int64 - DiskSizeBytes int64 - SnapshotCount int64 - CacheAgeMs int64 - CachedBytes int64 - WALCompactionTimeMs int64 - WriteOK int64 - WriteErr int64 - WriteDropped int64 -} - -// Statistics returns statistics for periodic monitoring. -func (c *Cache) Statistics(tags map[string]string) []models.Statistic { - return []models.Statistic{{ - Name: "tsm1_cache", - Tags: tags, - Values: map[string]interface{}{ - statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes), - statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes), - statSnapshots: atomic.LoadInt64(&c.stats.SnapshotCount), - statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs), - statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes), - statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs), - statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK), - statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr), - statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped), - }, - }} -} - // init initializes the cache and allocates the underlying store. Once initialized, // the store re-used until Freed. func (c *Cache) init() { @@ -291,13 +227,15 @@ func (c *Cache) Write(key []byte, values []Value) error { n := c.Size() + addedSize if limit > 0 && n > limit { - atomic.AddInt64(&c.stats.WriteErr, 1) + c.cacheTracker.IncWritesErr() + c.cacheTracker.AddWrittenBytesDrop(uint64(addedSize)) return ErrCacheMemorySizeLimitExceeded(n, limit) } newKey, err := c.store.write(key, values) if err != nil { - atomic.AddInt64(&c.stats.WriteErr, 1) + c.cacheTracker.IncWritesErr() + c.cacheTracker.AddWrittenBytesErr(uint64(addedSize)) return err } @@ -305,9 +243,10 @@ func (c *Cache) Write(key []byte, values []Value) error { addedSize += uint64(len(key)) } // Update the cache size and the memory size stat. - c.increaseSize(addedSize) - c.updateMemSize(int64(addedSize)) - atomic.AddInt64(&c.stats.WriteOK, 1) + c.cacheTracker.IncCacheSize(addedSize) + c.cacheTracker.AddMemBytes(addedSize) + c.cacheTracker.AddWrittenBytesOK(uint64(addedSize)) + c.cacheTracker.IncWritesOK() return nil } @@ -328,7 +267,8 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { limit := c.maxSize // maxSize is safe for reading without a lock. n := c.Size() + addedSize if limit > 0 && n > limit { - atomic.AddInt64(&c.stats.WriteErr, 1) + c.cacheTracker.IncWritesErr() + c.cacheTracker.AddWrittenBytesDrop(uint64(addedSize)) return ErrCacheMemorySizeLimitExceeded(n, limit) } @@ -337,32 +277,36 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { store := c.store c.mu.RUnlock() - // We'll optimistially set size here, and then decrement it for write errors. - c.increaseSize(addedSize) + var bytesWrittenErr uint64 + + // We'll optimistically set size here, and then decrement it for write errors. for k, v := range values { newKey, err := store.write([]byte(k), v) if err != nil { // The write failed, hold onto the error and adjust the size delta. werr = err addedSize -= uint64(Values(v).Size()) - c.decreaseSize(uint64(Values(v).Size())) + bytesWrittenErr += uint64(Values(v).Size()) } + if newKey { addedSize += uint64(len(k)) - c.increaseSize(uint64(len(k))) } } // Some points in the batch were dropped. An error is returned so // error stat is incremented as well. if werr != nil { - atomic.AddInt64(&c.stats.WriteDropped, 1) - atomic.AddInt64(&c.stats.WriteErr, 1) + c.cacheTracker.IncWritesErr() + c.cacheTracker.IncWritesDrop() + c.cacheTracker.AddWrittenBytesErr(bytesWrittenErr) } // Update the memory size stat - c.updateMemSize(int64(addedSize)) - atomic.AddInt64(&c.stats.WriteOK, 1) + c.cacheTracker.IncCacheSize(addedSize) + c.cacheTracker.AddMemBytes(addedSize) + c.cacheTracker.IncWritesOK() + c.cacheTracker.AddWrittenBytesOK(addedSize) c.mu.Lock() c.lastWriteTime = time.Now() @@ -384,7 +328,7 @@ func (c *Cache) Snapshot() (*Cache, error) { } c.snapshotting = true - c.snapshotAttempts++ // increment the number of times we tried to do this + c.cacheTracker.IncSnapshotsActive() // increment the number of times we tried to do this // If no snapshot exists, create a new one, otherwise update the existing snapshot if c.snapshot == nil { @@ -393,8 +337,10 @@ func (c *Cache) Snapshot() (*Cache, error) { return nil, err } + newMetrics := newCacheMetrics(c.cacheTracker.metrics.Labels()) c.snapshot = &Cache{ - store: store, + store: store, + cacheTracker: newCacheTracker(newMetrics), } } @@ -407,18 +353,17 @@ func (c *Cache) Snapshot() (*Cache, error) { c.snapshot.store, c.store = c.store, c.snapshot.store snapshotSize := c.Size() - // Save the size of the snapshot on the snapshot cache - atomic.StoreUint64(&c.snapshot.size, snapshotSize) - // Save the size of the snapshot on the live cache - atomic.StoreUint64(&c.snapshotSize, snapshotSize) + c.snapshot.cacheTracker.SetSnapshotSize(snapshotSize) // Save the size of the snapshot on the snapshot cache + c.cacheTracker.SetSnapshotSize(snapshotSize) // Save the size of the snapshot on the live cache // Reset the cache's store. c.store.reset() - atomic.StoreUint64(&c.size, 0) + c.cacheTracker.SetCacheSize(0) c.lastSnapshot = time.Now() - c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot - c.updateSnapshots() + c.cacheTracker.AddSnapshottedBytes(snapshotSize) // increment the number of bytes added to the snapshot + c.cacheTracker.SetDiskBytes(0) + c.cacheTracker.SetSnapshotsActive(0) return c.snapshot, nil } @@ -455,33 +400,26 @@ func (c *Cache) ClearSnapshot(success bool) { c.snapshotting = false if success { - c.snapshotAttempts = 0 - c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache + snapshotSize := c.cacheTracker.SnapshotSize() + c.cacheTracker.SetSnapshotsActive(0) + c.cacheTracker.SubMemBytes(snapshotSize) // decrement the number of bytes in cache // Reset the snapshot to a fresh Cache. + newMetrics := newCacheMetrics(c.cacheTracker.metrics.Labels()) c.snapshot = &Cache{ - store: c.snapshot.store, + store: c.snapshot.store, + cacheTracker: newCacheTracker(newMetrics), } - atomic.StoreUint64(&c.snapshotSize, 0) - c.updateSnapshots() + c.cacheTracker.SetSnapshotSize(0) + c.cacheTracker.SetDiskBytes(0) + c.cacheTracker.SetSnapshotsActive(0) } } // Size returns the number of point-calcuated bytes the cache currently uses. func (c *Cache) Size() uint64 { - return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize) -} - -// increaseSize increases size by delta. -func (c *Cache) increaseSize(delta uint64) { - atomic.AddUint64(&c.size, delta) -} - -// decreaseSize decreases size by delta. -func (c *Cache) decreaseSize(delta uint64) { - // Per sync/atomic docs, bit-flip delta minus one to perform subtraction within AddUint64. - atomic.AddUint64(&c.size, ^(delta - 1)) + return c.cacheTracker.CacheSize() + c.cacheTracker.SnapshotSize() } // MaxSize returns the maximum number of bytes the cache may consume. @@ -623,6 +561,7 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) { c.mu.Lock() defer c.mu.Unlock() + var total uint64 for _, k := range keys { // Make sure key exist in the cache, skip if it does not e := c.store.entry(k) @@ -630,23 +569,28 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) { continue } - origSize := uint64(e.size()) + total += uint64(e.size()) + // Everything is being deleted. if min == math.MinInt64 && max == math.MaxInt64 { - c.decreaseSize(origSize + uint64(len(k))) + total += uint64(len(k)) // all entries and the key. c.store.remove(k) continue } + // Filter what to delete by time range. e.filter(min, max) if e.count() == 0 { + // Nothing left in cache for that key + total += uint64(len(k)) // all entries and the key. c.store.remove(k) - c.decreaseSize(origSize + uint64(len(k))) continue } - c.decreaseSize(origSize - uint64(e.size())) + // Just update what is being deleted by the size of the filtered entries. + total -= uint64(e.size()) } - atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size())) + c.cacheTracker.DecCacheSize(total) // Decrease the live cache size. + c.cacheTracker.SetMemBytes(uint64(c.Size())) } // SetMaxSize updates the memory limit of the cache. @@ -777,23 +721,156 @@ func (c *Cache) LastWriteTime() time.Time { func (c *Cache) UpdateAge() { c.mu.RLock() defer c.mu.RUnlock() - ageStat := int64(time.Since(c.lastSnapshot) / time.Millisecond) - atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat) + c.cacheTracker.SetAge(time.Since(c.lastSnapshot)) } -// UpdateCompactTime updates WAL compaction time statistic based on d. -func (c *Cache) UpdateCompactTime(d time.Duration) { - atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond)) +// cacheTracker tracks writes to the cache and snapshots. +// +// As well as being responsible for providing atomic reads and writes to the +// statistics, cacheTracker also mirrors any changes to the external prometheus +// metrics, which the Engine exposes. +// +// *NOTE* - cacheTracker fields should not be directory modified. Doing so +// could result in the Engine exposing inaccurate metrics. +type cacheTracker struct { + metrics *cacheMetrics + snapshotsActive uint64 + snapshotSize uint64 + cacheSize uint64 + + // Used in testing. + memSizeBytes uint64 + snapshottedBytes uint64 + writesDropped uint64 + writesErr uint64 } -// updateCachedBytes increases the cachedBytes counter by b. -func (c *Cache) updateCachedBytes(b uint64) { - atomic.AddInt64(&c.stats.CachedBytes, int64(b)) +func newCacheTracker(metrics *cacheMetrics) *cacheTracker { + return &cacheTracker{metrics: metrics} } -// updateMemSize updates the memSize level by b. -func (c *Cache) updateMemSize(b int64) { - atomic.AddInt64(&c.stats.MemSizeBytes, b) +// AddMemBytes increases the number of in-memory cache bytes. +func (t *cacheTracker) AddMemBytes(bytes uint64) { + atomic.AddUint64(&t.memSizeBytes, bytes) + + labels := t.metrics.Labels() + t.metrics.MemSize.With(labels).Add(float64(bytes)) +} + +// SubMemBytes decreases the number of in-memory cache bytes. +func (t *cacheTracker) SubMemBytes(bytes uint64) { + atomic.AddUint64(&t.memSizeBytes, ^(bytes - 1)) + + labels := t.metrics.Labels() + t.metrics.MemSize.With(labels).Sub(float64(bytes)) +} + +// SetMemBytes sets the number of in-memory cache bytes. +func (t *cacheTracker) SetMemBytes(bytes uint64) { + atomic.StoreUint64(&t.memSizeBytes, bytes) + + labels := t.metrics.Labels() + t.metrics.MemSize.With(labels).Set(float64(bytes)) +} + +// AddBytesWritten increases the number of bytes written to the cache. +func (t *cacheTracker) AddBytesWritten(bytes uint64) { + labels := t.metrics.Labels() + t.metrics.MemSize.With(labels).Add(float64(bytes)) +} + +// AddSnapshottedBytes increases the number of bytes snapshotted. +func (t *cacheTracker) AddSnapshottedBytes(bytes uint64) { + atomic.AddUint64(&t.snapshottedBytes, bytes) + + labels := t.metrics.Labels() + t.metrics.SnapshottedBytes.With(labels).Add(float64(bytes)) +} + +// SetDiskBytes sets the number of bytes on disk used by snapshot data. +func (t *cacheTracker) SetDiskBytes(bytes uint64) { + labels := t.metrics.Labels() + t.metrics.DiskSize.With(labels).Set(float64(bytes)) +} + +// IncSnapshotsActive increases the number of active snapshots. +func (t *cacheTracker) IncSnapshotsActive() { + atomic.AddUint64(&t.snapshotsActive, 1) + + labels := t.metrics.Labels() + t.metrics.SnapshotsActive.With(labels).Inc() +} + +// SetSnapshotsActive sets the number of bytes on disk used by snapshot data. +func (t *cacheTracker) SetSnapshotsActive(n uint64) { + atomic.StoreUint64(&t.snapshotsActive, n) + + labels := t.metrics.Labels() + t.metrics.SnapshotsActive.With(labels).Set(float64(n)) +} + +// AddWrittenBytes increases the number of bytes written to the cache, with a required status. +func (t *cacheTracker) AddWrittenBytes(status string, bytes uint64) { + labels := t.metrics.Labels() + labels["status"] = status + t.metrics.WrittenBytes.With(labels).Add(float64(bytes)) +} + +// AddWrittenBytesOK increments the number of successful writes. +func (t *cacheTracker) AddWrittenBytesOK(bytes uint64) { t.AddWrittenBytes("ok", bytes) } + +// AddWrittenBytesError increments the number of writes that encountered an error. +func (t *cacheTracker) AddWrittenBytesErr(bytes uint64) { t.AddWrittenBytes("error", bytes) } + +// AddWrittenBytesDrop increments the number of writes that were dropped. +func (t *cacheTracker) AddWrittenBytesDrop(bytes uint64) { t.AddWrittenBytes("dropped", bytes) } + +// IncWrites increments the number of writes to the cache, with a required status. +func (t *cacheTracker) IncWrites(status string) { + labels := t.metrics.Labels() + labels["status"] = status + t.metrics.Writes.With(labels).Inc() +} + +// IncWritesOK increments the number of successful writes. +func (t *cacheTracker) IncWritesOK() { t.IncWrites("ok") } + +// IncWritesError increments the number of writes that encountered an error. +func (t *cacheTracker) IncWritesErr() { + atomic.AddUint64(&t.writesErr, 1) + + t.IncWrites("error") +} + +// IncWritesDrop increments the number of writes that were dropped. +func (t *cacheTracker) IncWritesDrop() { + atomic.AddUint64(&t.writesDropped, 1) + + t.IncWrites("dropped") +} + +// CacheSize returns the live cache size. +func (t *cacheTracker) CacheSize() uint64 { return atomic.LoadUint64(&t.cacheSize) } + +// IncCacheSize increases the live cache size by sz bytes. +func (t *cacheTracker) IncCacheSize(sz uint64) { atomic.AddUint64(&t.cacheSize, sz) } + +// DecCacheSize decreases the live cache size by sz bytes. +func (t *cacheTracker) DecCacheSize(sz uint64) { atomic.AddUint64(&t.cacheSize, ^(sz - 1)) } + +// SetCacheSize sets the live cache size to sz. +func (t *cacheTracker) SetCacheSize(sz uint64) { atomic.StoreUint64(&t.cacheSize, sz) } + +// SetSnapshotSize sets the last successful snapshot size. +func (t *cacheTracker) SetSnapshotSize(sz uint64) { atomic.StoreUint64(&t.snapshotSize, sz) } + +// SnapshotSize returns the last successful snapshot size. +func (t *cacheTracker) SnapshotSize() uint64 { return atomic.LoadUint64(&t.snapshotSize) } + +// SetAge sets the time since the last successful snapshot +func (t *cacheTracker) SetAge(d time.Duration) { + labels := t.metrics.Labels() + t.metrics.Age.With(labels).Set(d.Seconds()) } func valueType(v Value) byte { @@ -811,13 +888,6 @@ func valueType(v Value) byte { } } -// updateSnapshots updates the snapshotsCount and the diskSize levels. -func (c *Cache) updateSnapshots() { - // Update disk stats - atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize))) - atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts)) -} - type emptyStore struct{} func (e emptyStore) entry(key []byte) *entry { return nil } diff --git a/tsdb/tsm1/cache_test.go b/tsdb/tsm1/cache_test.go index 0f0dff9673..38f243ea92 100644 --- a/tsdb/tsm1/cache_test.go +++ b/tsdb/tsm1/cache_test.go @@ -138,9 +138,9 @@ func TestCache_WriteMulti_Stats(t *testing.T) { } // Write stats updated - if got, exp := c.stats.WriteDropped, int64(1); got != exp { + if got, exp := atomic.LoadUint64(&c.cacheTracker.writesDropped), uint64(1); got != exp { t.Fatalf("got %v, expected %v", got, exp) - } else if got, exp := c.stats.WriteErr, int64(1); got != exp { + } else if got, exp := atomic.LoadUint64(&c.cacheTracker.writesErr), uint64(1); got != exp { t.Fatalf("got %v, expected %v", got, exp) } } @@ -190,11 +190,11 @@ func TestCache_Cache_DeleteRange(t *testing.T) { c.DeleteRange([][]byte{[]byte("bar")}, 2, math.MaxInt64) if exp, keys := [][]byte{[]byte("bar"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) { - t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) + t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys) } if got, exp := c.Size(), valuesSize+uint64(v0.Size())+6; exp != got { - t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got) + t.Fatalf("cache size incorrect after delete, exp %d, got %d", exp, got) } if got, exp := len(c.Values([]byte("bar"))), 1; got != exp { @@ -479,7 +479,7 @@ func TestCache_Snapshot_Stats(t *testing.T) { t.Fatal(err) } - if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp { + if got, exp := atomic.LoadUint64(&c.cacheTracker.memSizeBytes), uint64(16)+3; got != exp { t.Fatalf("got %v, expected %v", got, exp) } @@ -494,11 +494,11 @@ func TestCache_Snapshot_Stats(t *testing.T) { } // Cached bytes should have been increased. - if got, exp := c.stats.CachedBytes, int64(16)+3; got != exp { + if got, exp := atomic.LoadUint64(&c.cacheTracker.snapshottedBytes), uint64(16)+3; got != exp { t.Fatalf("got %v, expected %v", got, exp) } - if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp { + if got, exp := atomic.LoadUint64(&c.cacheTracker.memSizeBytes), uint64(16)+3; got != exp { t.Fatalf("got %v, expected %v", got, exp) } } diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 50096ca0a5..1f99609d24 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -501,6 +501,7 @@ func (e *Engine) Open() error { // Propagate prometheus metrics down into trackers. e.compactionTracker = newCompactionTracker(e.blockMetrics.compactionMetrics) e.FileStore.fileTracker = newFileTracker(e.blockMetrics.fileMetrics) + e.Cache.cacheTracker = newCacheTracker(e.blockMetrics.cacheMetrics) e.scheduler.setCompactionTracker(e.compactionTracker) @@ -554,7 +555,6 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector { var metrics []prometheus.Collector metrics = append(metrics, e.blockMetrics.PrometheusCollectors()...) - // TODO(edd): Add Cache metrics // TODO(edd): Add WAL metrics return metrics } @@ -1161,7 +1161,6 @@ func (e *Engine) WriteSnapshot() error { log, logEnd := logger.NewOperation(e.logger, "Cache snapshot", "tsm1_cache_snapshot") defer func() { elapsed := time.Since(started) - e.Cache.UpdateCompactTime(elapsed) log.Info("Snapshot for path written", zap.String("path", e.path), zap.Duration("duration", elapsed)) diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index e25afba80e..733c2dc0c3 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -241,6 +241,7 @@ func NewFileStore(dir string) *FileStore { }, obs: noFileStoreObserver{}, parseFileName: DefaultParseFileName, + fileTracker: newFileTracker(newFileMetrics(nil)), } fs.purger.fileStore = fs return fs diff --git a/tsdb/tsm1/metrics.go b/tsdb/tsm1/metrics.go index 6a90c9990c..7d832a6bfd 100644 --- a/tsdb/tsm1/metrics.go +++ b/tsdb/tsm1/metrics.go @@ -10,14 +10,16 @@ import ( // namespace is the leading part of all published metrics for the Storage service. const namespace = "storage" -const compactionSubsystem = "compactions" // sub-system associated with metrics for compactions -const fileStoreSubsystem = "tsm_files" // sub-system associated with metrics for compactions +const compactionSubsystem = "compactions" // sub-system associated with metrics for compactions. +const fileStoreSubsystem = "tsm_files" // sub-system associated with metrics for TSM files. +const cacheSubsystem = "cache" // sub-system associated with metrics for the cache. // blockMetrics are a set of metrics concerned with tracking data about block storage. type blockMetrics struct { labels prometheus.Labels *compactionMetrics *fileMetrics + *cacheMetrics } // newBlockMetrics initialises the prometheus metrics for the block subsystem. @@ -26,6 +28,7 @@ func newBlockMetrics(labels prometheus.Labels) *blockMetrics { labels: labels, compactionMetrics: newCompactionMetrics(labels), fileMetrics: newFileMetrics(labels), + cacheMetrics: newCacheMetrics(labels), } } @@ -34,16 +37,20 @@ func (m *blockMetrics) PrometheusCollectors() []prometheus.Collector { var metrics []prometheus.Collector metrics = append(metrics, m.compactionMetrics.PrometheusCollectors()...) metrics = append(metrics, m.fileMetrics.PrometheusCollectors()...) + metrics = append(metrics, m.cacheMetrics.PrometheusCollectors()...) return metrics } // compactionMetrics are a set of metrics concerned with tracking data about compactions. type compactionMetrics struct { - labels prometheus.Labels // Read Only - Compactions *prometheus.CounterVec + labels prometheus.Labels // Read Only + CompactionsActive *prometheus.GaugeVec CompactionDuration *prometheus.HistogramVec CompactionQueue *prometheus.GaugeVec + + // The following metrics include a ``"status" = {ok, error, dropped}` label + Compactions *prometheus.CounterVec } // newCompactionMetrics initialises the prometheus metrics for compactions. @@ -155,3 +162,98 @@ func (m *fileMetrics) PrometheusCollectors() []prometheus.Collector { m.Files, } } + +// cacheMetrics are a set of metrics concerned with tracking data about the TSM Cache. +type cacheMetrics struct { + labels prometheus.Labels // Read Only + + MemSize *prometheus.GaugeVec + DiskSize *prometheus.GaugeVec + SnapshotsActive *prometheus.GaugeVec + Age *prometheus.GaugeVec + SnapshottedBytes *prometheus.CounterVec + + // The following metrics include a ``"status" = {ok, error, dropped}` label + WrittenBytes *prometheus.CounterVec + Writes *prometheus.CounterVec +} + +// newCacheMetrics initialises the prometheus metrics for compactions. +func newCacheMetrics(labels prometheus.Labels) *cacheMetrics { + var names []string + for k := range labels { + names = append(names, k) + } + sort.Strings(names) + + writeNames := append(names, "status") + sort.Strings(writeNames) + + return &cacheMetrics{ + labels: labels, + MemSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "inuse_bytes", + Help: "In-memory size of cache.", + }, names), + DiskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "disk_bytes", + Help: "Number of bytes on disk used by snapshot data.", + }, names), + SnapshotsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "snapshots_active", + Help: "Number of active concurrent snapshots (>1 when splitting the cache).", + }, names), + Age: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "age", + Help: "Age of the current cache (time since last snapshot or initialisation).", + }, names), + SnapshottedBytes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "snapshot_bytes", + Help: "Number of bytes snapshotted.", + }, names), + WrittenBytes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "written_bytes", + Help: "Number of bytes successfully written to the Cache.", + }, writeNames), + Writes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "writes", + Help: "Number of writes to the Cache.", + }, writeNames), + } +} + +// Labels returns a copy of labels for use with cache metrics. +func (m *cacheMetrics) Labels() prometheus.Labels { + l := make(map[string]string, len(m.labels)) + for k, v := range m.labels { + l[k] = v + } + return l +} + +// PrometheusCollectors satisfies the prom.PrometheusCollector interface. +func (m *cacheMetrics) PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + m.MemSize, + m.DiskSize, + m.SnapshotsActive, + m.Age, + m.SnapshottedBytes, + m.WrittenBytes, + m.Writes, + } +} diff --git a/tsdb/tsm1/scheduler.go b/tsdb/tsm1/scheduler.go index 833f98c817..c4beba403c 100644 --- a/tsdb/tsm1/scheduler.go +++ b/tsdb/tsm1/scheduler.go @@ -13,8 +13,9 @@ type scheduler struct { func newScheduler(maxConcurrency int) *scheduler { return &scheduler{ - maxConcurrency: maxConcurrency, - weights: defaultWeights, + maxConcurrency: maxConcurrency, + weights: defaultWeights, + compactionTracker: newCompactionTracker(newCompactionMetrics(nil)), } } diff --git a/tsdb/tsm1/scheduler_test.go b/tsdb/tsm1/scheduler_test.go index 9ff40b0e5f..97871def85 100644 --- a/tsdb/tsm1/scheduler_test.go +++ b/tsdb/tsm1/scheduler_test.go @@ -3,7 +3,7 @@ package tsm1 import "testing" func TestScheduler_Runnable_Empty(t *testing.T) { - s := newScheduler(&EngineStatistics{}, 1) + s := newScheduler(1) for i := 1; i < 5; i++ { s.setDepth(i, 1) @@ -20,11 +20,10 @@ func TestScheduler_Runnable_Empty(t *testing.T) { } func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { - s := newScheduler(&EngineStatistics{}, 1) + s := newScheduler(1) // level 1 - s.stats = &EngineStatistics{} - s.stats.TSMCompactionsActive[0] = 1 + s.compactionTracker.active[1] = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -33,8 +32,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // level 2 - s.stats = &EngineStatistics{} - s.stats.TSMCompactionsActive[1] = 1 + s.compactionTracker.active[2] = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -43,8 +41,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // level 3 - s.stats = &EngineStatistics{} - s.stats.TSMCompactionsActive[2] = 1 + s.compactionTracker.active[3] = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -53,8 +50,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // optimize - s.stats = &EngineStatistics{} - s.stats.TSMOptimizeCompactionsActive++ + s.compactionTracker.active[4] = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got { @@ -63,8 +59,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) { } // full - s.stats = &EngineStatistics{} - s.stats.TSMFullCompactionsActive++ + s.compactionTracker.active[5] = 1 for i := 0; i <= 4; i++ { _, runnable := s.next() if exp, got := false, runnable; exp != got {