Convert Cache statistics

pull/10616/head
Edd Robinson 2018-11-01 18:58:56 +00:00
parent d61b9f1645
commit 3b980ed7e3
7 changed files with 325 additions and 157 deletions

View File

@ -143,25 +143,6 @@ func (e *entry) InfluxQLType() (influxql.DataType, error) {
return e.values.InfluxQLType()
}
// Statistics gathered by the Cache.
const (
// levels - point in time measures
statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes
statSnapshots = "snapshotCount" // level: Number of active snapshots.
statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time
// counters - accumulative measures
statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots
statCacheWriteOK = "writeOk"
statCacheWriteErr = "writeErr"
statCacheWriteDropped = "writeDropped"
)
// storer is the interface that descibes a cache's store.
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
@ -178,12 +159,7 @@ type storer interface {
// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
// Due to a bug in atomic size needs to be the first word in the struct, as
// that's the only place where you're guaranteed to be 64-bit aligned on a
// 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
size uint64
snapshotSize uint64
_ uint64 // Padding for 32 bit struct alignment
mu sync.RWMutex
store storer
maxSize uint64
@ -194,10 +170,7 @@ type Cache struct {
snapshot *Cache
snapshotting bool
// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
snapshotAttempts int
stats *CacheStatistics
cacheTracker *cacheTracker
lastSnapshot time.Time
lastWriteTime time.Time
@ -213,50 +186,13 @@ func NewCache(maxSize uint64) *Cache {
c := &Cache{
maxSize: maxSize,
store: emptyStore{},
stats: &CacheStatistics{},
lastSnapshot: time.Now(),
cacheTracker: newCacheTracker(newCacheMetrics(nil)),
}
c.initialize.Store(&sync.Once{})
c.UpdateAge()
c.UpdateCompactTime(0)
c.updateCachedBytes(0)
c.updateMemSize(0)
c.updateSnapshots()
return c
}
// CacheStatistics hold statistics related to the cache.
type CacheStatistics struct {
MemSizeBytes int64
DiskSizeBytes int64
SnapshotCount int64
CacheAgeMs int64
CachedBytes int64
WALCompactionTimeMs int64
WriteOK int64
WriteErr int64
WriteDropped int64
}
// Statistics returns statistics for periodic monitoring.
func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_cache",
Tags: tags,
Values: map[string]interface{}{
statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes),
statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes),
statSnapshots: atomic.LoadInt64(&c.stats.SnapshotCount),
statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs),
statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes),
statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs),
statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK),
statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr),
statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped),
},
}}
}
// init initializes the cache and allocates the underlying store. Once initialized,
// the store re-used until Freed.
func (c *Cache) init() {
@ -291,13 +227,15 @@ func (c *Cache) Write(key []byte, values []Value) error {
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
c.cacheTracker.IncWritesErr()
c.cacheTracker.AddWrittenBytesDrop(uint64(addedSize))
return ErrCacheMemorySizeLimitExceeded(n, limit)
}
newKey, err := c.store.write(key, values)
if err != nil {
atomic.AddInt64(&c.stats.WriteErr, 1)
c.cacheTracker.IncWritesErr()
c.cacheTracker.AddWrittenBytesErr(uint64(addedSize))
return err
}
@ -305,9 +243,10 @@ func (c *Cache) Write(key []byte, values []Value) error {
addedSize += uint64(len(key))
}
// Update the cache size and the memory size stat.
c.increaseSize(addedSize)
c.updateMemSize(int64(addedSize))
atomic.AddInt64(&c.stats.WriteOK, 1)
c.cacheTracker.IncCacheSize(addedSize)
c.cacheTracker.AddMemBytes(addedSize)
c.cacheTracker.AddWrittenBytesOK(uint64(addedSize))
c.cacheTracker.IncWritesOK()
return nil
}
@ -328,7 +267,8 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
c.cacheTracker.IncWritesErr()
c.cacheTracker.AddWrittenBytesDrop(uint64(addedSize))
return ErrCacheMemorySizeLimitExceeded(n, limit)
}
@ -337,32 +277,36 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
store := c.store
c.mu.RUnlock()
// We'll optimistially set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
var bytesWrittenErr uint64
// We'll optimistically set size here, and then decrement it for write errors.
for k, v := range values {
newKey, err := store.write([]byte(k), v)
if err != nil {
// The write failed, hold onto the error and adjust the size delta.
werr = err
addedSize -= uint64(Values(v).Size())
c.decreaseSize(uint64(Values(v).Size()))
bytesWrittenErr += uint64(Values(v).Size())
}
if newKey {
addedSize += uint64(len(k))
c.increaseSize(uint64(len(k)))
}
}
// Some points in the batch were dropped. An error is returned so
// error stat is incremented as well.
if werr != nil {
atomic.AddInt64(&c.stats.WriteDropped, 1)
atomic.AddInt64(&c.stats.WriteErr, 1)
c.cacheTracker.IncWritesErr()
c.cacheTracker.IncWritesDrop()
c.cacheTracker.AddWrittenBytesErr(bytesWrittenErr)
}
// Update the memory size stat
c.updateMemSize(int64(addedSize))
atomic.AddInt64(&c.stats.WriteOK, 1)
c.cacheTracker.IncCacheSize(addedSize)
c.cacheTracker.AddMemBytes(addedSize)
c.cacheTracker.IncWritesOK()
c.cacheTracker.AddWrittenBytesOK(addedSize)
c.mu.Lock()
c.lastWriteTime = time.Now()
@ -384,7 +328,7 @@ func (c *Cache) Snapshot() (*Cache, error) {
}
c.snapshotting = true
c.snapshotAttempts++ // increment the number of times we tried to do this
c.cacheTracker.IncSnapshotsActive() // increment the number of times we tried to do this
// If no snapshot exists, create a new one, otherwise update the existing snapshot
if c.snapshot == nil {
@ -393,8 +337,10 @@ func (c *Cache) Snapshot() (*Cache, error) {
return nil, err
}
newMetrics := newCacheMetrics(c.cacheTracker.metrics.Labels())
c.snapshot = &Cache{
store: store,
store: store,
cacheTracker: newCacheTracker(newMetrics),
}
}
@ -407,18 +353,17 @@ func (c *Cache) Snapshot() (*Cache, error) {
c.snapshot.store, c.store = c.store, c.snapshot.store
snapshotSize := c.Size()
// Save the size of the snapshot on the snapshot cache
atomic.StoreUint64(&c.snapshot.size, snapshotSize)
// Save the size of the snapshot on the live cache
atomic.StoreUint64(&c.snapshotSize, snapshotSize)
c.snapshot.cacheTracker.SetSnapshotSize(snapshotSize) // Save the size of the snapshot on the snapshot cache
c.cacheTracker.SetSnapshotSize(snapshotSize) // Save the size of the snapshot on the live cache
// Reset the cache's store.
c.store.reset()
atomic.StoreUint64(&c.size, 0)
c.cacheTracker.SetCacheSize(0)
c.lastSnapshot = time.Now()
c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.updateSnapshots()
c.cacheTracker.AddSnapshottedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.cacheTracker.SetDiskBytes(0)
c.cacheTracker.SetSnapshotsActive(0)
return c.snapshot, nil
}
@ -455,33 +400,26 @@ func (c *Cache) ClearSnapshot(success bool) {
c.snapshotting = false
if success {
c.snapshotAttempts = 0
c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache
snapshotSize := c.cacheTracker.SnapshotSize()
c.cacheTracker.SetSnapshotsActive(0)
c.cacheTracker.SubMemBytes(snapshotSize) // decrement the number of bytes in cache
// Reset the snapshot to a fresh Cache.
newMetrics := newCacheMetrics(c.cacheTracker.metrics.Labels())
c.snapshot = &Cache{
store: c.snapshot.store,
store: c.snapshot.store,
cacheTracker: newCacheTracker(newMetrics),
}
atomic.StoreUint64(&c.snapshotSize, 0)
c.updateSnapshots()
c.cacheTracker.SetSnapshotSize(0)
c.cacheTracker.SetDiskBytes(0)
c.cacheTracker.SetSnapshotsActive(0)
}
}
// Size returns the number of point-calcuated bytes the cache currently uses.
func (c *Cache) Size() uint64 {
return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize)
}
// increaseSize increases size by delta.
func (c *Cache) increaseSize(delta uint64) {
atomic.AddUint64(&c.size, delta)
}
// decreaseSize decreases size by delta.
func (c *Cache) decreaseSize(delta uint64) {
// Per sync/atomic docs, bit-flip delta minus one to perform subtraction within AddUint64.
atomic.AddUint64(&c.size, ^(delta - 1))
return c.cacheTracker.CacheSize() + c.cacheTracker.SnapshotSize()
}
// MaxSize returns the maximum number of bytes the cache may consume.
@ -623,6 +561,7 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {
c.mu.Lock()
defer c.mu.Unlock()
var total uint64
for _, k := range keys {
// Make sure key exist in the cache, skip if it does not
e := c.store.entry(k)
@ -630,23 +569,28 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {
continue
}
origSize := uint64(e.size())
total += uint64(e.size())
// Everything is being deleted.
if min == math.MinInt64 && max == math.MaxInt64 {
c.decreaseSize(origSize + uint64(len(k)))
total += uint64(len(k)) // all entries and the key.
c.store.remove(k)
continue
}
// Filter what to delete by time range.
e.filter(min, max)
if e.count() == 0 {
// Nothing left in cache for that key
total += uint64(len(k)) // all entries and the key.
c.store.remove(k)
c.decreaseSize(origSize + uint64(len(k)))
continue
}
c.decreaseSize(origSize - uint64(e.size()))
// Just update what is being deleted by the size of the filtered entries.
total -= uint64(e.size())
}
atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size()))
c.cacheTracker.DecCacheSize(total) // Decrease the live cache size.
c.cacheTracker.SetMemBytes(uint64(c.Size()))
}
// SetMaxSize updates the memory limit of the cache.
@ -777,23 +721,156 @@ func (c *Cache) LastWriteTime() time.Time {
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
ageStat := int64(time.Since(c.lastSnapshot) / time.Millisecond)
atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat)
c.cacheTracker.SetAge(time.Since(c.lastSnapshot))
}
// UpdateCompactTime updates WAL compaction time statistic based on d.
func (c *Cache) UpdateCompactTime(d time.Duration) {
atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond))
// cacheTracker tracks writes to the cache and snapshots.
//
// As well as being responsible for providing atomic reads and writes to the
// statistics, cacheTracker also mirrors any changes to the external prometheus
// metrics, which the Engine exposes.
//
// *NOTE* - cacheTracker fields should not be directory modified. Doing so
// could result in the Engine exposing inaccurate metrics.
type cacheTracker struct {
metrics *cacheMetrics
snapshotsActive uint64
snapshotSize uint64
cacheSize uint64
// Used in testing.
memSizeBytes uint64
snapshottedBytes uint64
writesDropped uint64
writesErr uint64
}
// updateCachedBytes increases the cachedBytes counter by b.
func (c *Cache) updateCachedBytes(b uint64) {
atomic.AddInt64(&c.stats.CachedBytes, int64(b))
func newCacheTracker(metrics *cacheMetrics) *cacheTracker {
return &cacheTracker{metrics: metrics}
}
// updateMemSize updates the memSize level by b.
func (c *Cache) updateMemSize(b int64) {
atomic.AddInt64(&c.stats.MemSizeBytes, b)
// AddMemBytes increases the number of in-memory cache bytes.
func (t *cacheTracker) AddMemBytes(bytes uint64) {
atomic.AddUint64(&t.memSizeBytes, bytes)
labels := t.metrics.Labels()
t.metrics.MemSize.With(labels).Add(float64(bytes))
}
// SubMemBytes decreases the number of in-memory cache bytes.
func (t *cacheTracker) SubMemBytes(bytes uint64) {
atomic.AddUint64(&t.memSizeBytes, ^(bytes - 1))
labels := t.metrics.Labels()
t.metrics.MemSize.With(labels).Sub(float64(bytes))
}
// SetMemBytes sets the number of in-memory cache bytes.
func (t *cacheTracker) SetMemBytes(bytes uint64) {
atomic.StoreUint64(&t.memSizeBytes, bytes)
labels := t.metrics.Labels()
t.metrics.MemSize.With(labels).Set(float64(bytes))
}
// AddBytesWritten increases the number of bytes written to the cache.
func (t *cacheTracker) AddBytesWritten(bytes uint64) {
labels := t.metrics.Labels()
t.metrics.MemSize.With(labels).Add(float64(bytes))
}
// AddSnapshottedBytes increases the number of bytes snapshotted.
func (t *cacheTracker) AddSnapshottedBytes(bytes uint64) {
atomic.AddUint64(&t.snapshottedBytes, bytes)
labels := t.metrics.Labels()
t.metrics.SnapshottedBytes.With(labels).Add(float64(bytes))
}
// SetDiskBytes sets the number of bytes on disk used by snapshot data.
func (t *cacheTracker) SetDiskBytes(bytes uint64) {
labels := t.metrics.Labels()
t.metrics.DiskSize.With(labels).Set(float64(bytes))
}
// IncSnapshotsActive increases the number of active snapshots.
func (t *cacheTracker) IncSnapshotsActive() {
atomic.AddUint64(&t.snapshotsActive, 1)
labels := t.metrics.Labels()
t.metrics.SnapshotsActive.With(labels).Inc()
}
// SetSnapshotsActive sets the number of bytes on disk used by snapshot data.
func (t *cacheTracker) SetSnapshotsActive(n uint64) {
atomic.StoreUint64(&t.snapshotsActive, n)
labels := t.metrics.Labels()
t.metrics.SnapshotsActive.With(labels).Set(float64(n))
}
// AddWrittenBytes increases the number of bytes written to the cache, with a required status.
func (t *cacheTracker) AddWrittenBytes(status string, bytes uint64) {
labels := t.metrics.Labels()
labels["status"] = status
t.metrics.WrittenBytes.With(labels).Add(float64(bytes))
}
// AddWrittenBytesOK increments the number of successful writes.
func (t *cacheTracker) AddWrittenBytesOK(bytes uint64) { t.AddWrittenBytes("ok", bytes) }
// AddWrittenBytesError increments the number of writes that encountered an error.
func (t *cacheTracker) AddWrittenBytesErr(bytes uint64) { t.AddWrittenBytes("error", bytes) }
// AddWrittenBytesDrop increments the number of writes that were dropped.
func (t *cacheTracker) AddWrittenBytesDrop(bytes uint64) { t.AddWrittenBytes("dropped", bytes) }
// IncWrites increments the number of writes to the cache, with a required status.
func (t *cacheTracker) IncWrites(status string) {
labels := t.metrics.Labels()
labels["status"] = status
t.metrics.Writes.With(labels).Inc()
}
// IncWritesOK increments the number of successful writes.
func (t *cacheTracker) IncWritesOK() { t.IncWrites("ok") }
// IncWritesError increments the number of writes that encountered an error.
func (t *cacheTracker) IncWritesErr() {
atomic.AddUint64(&t.writesErr, 1)
t.IncWrites("error")
}
// IncWritesDrop increments the number of writes that were dropped.
func (t *cacheTracker) IncWritesDrop() {
atomic.AddUint64(&t.writesDropped, 1)
t.IncWrites("dropped")
}
// CacheSize returns the live cache size.
func (t *cacheTracker) CacheSize() uint64 { return atomic.LoadUint64(&t.cacheSize) }
// IncCacheSize increases the live cache size by sz bytes.
func (t *cacheTracker) IncCacheSize(sz uint64) { atomic.AddUint64(&t.cacheSize, sz) }
// DecCacheSize decreases the live cache size by sz bytes.
func (t *cacheTracker) DecCacheSize(sz uint64) { atomic.AddUint64(&t.cacheSize, ^(sz - 1)) }
// SetCacheSize sets the live cache size to sz.
func (t *cacheTracker) SetCacheSize(sz uint64) { atomic.StoreUint64(&t.cacheSize, sz) }
// SetSnapshotSize sets the last successful snapshot size.
func (t *cacheTracker) SetSnapshotSize(sz uint64) { atomic.StoreUint64(&t.snapshotSize, sz) }
// SnapshotSize returns the last successful snapshot size.
func (t *cacheTracker) SnapshotSize() uint64 { return atomic.LoadUint64(&t.snapshotSize) }
// SetAge sets the time since the last successful snapshot
func (t *cacheTracker) SetAge(d time.Duration) {
labels := t.metrics.Labels()
t.metrics.Age.With(labels).Set(d.Seconds())
}
func valueType(v Value) byte {
@ -811,13 +888,6 @@ func valueType(v Value) byte {
}
}
// updateSnapshots updates the snapshotsCount and the diskSize levels.
func (c *Cache) updateSnapshots() {
// Update disk stats
atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize)))
atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts))
}
type emptyStore struct{}
func (e emptyStore) entry(key []byte) *entry { return nil }

View File

@ -138,9 +138,9 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
}
// Write stats updated
if got, exp := c.stats.WriteDropped, int64(1); got != exp {
if got, exp := atomic.LoadUint64(&c.cacheTracker.writesDropped), uint64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
} else if got, exp := c.stats.WriteErr, int64(1); got != exp {
} else if got, exp := atomic.LoadUint64(&c.cacheTracker.writesErr), uint64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}
@ -190,11 +190,11 @@ func TestCache_Cache_DeleteRange(t *testing.T) {
c.DeleteRange([][]byte{[]byte("bar")}, 2, math.MaxInt64)
if exp, keys := [][]byte{[]byte("bar"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys)
}
if got, exp := c.Size(), valuesSize+uint64(v0.Size())+6; exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
t.Fatalf("cache size incorrect after delete, exp %d, got %d", exp, got)
}
if got, exp := len(c.Values([]byte("bar"))), 1; got != exp {
@ -479,7 +479,7 @@ func TestCache_Snapshot_Stats(t *testing.T) {
t.Fatal(err)
}
if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp {
if got, exp := atomic.LoadUint64(&c.cacheTracker.memSizeBytes), uint64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
@ -494,11 +494,11 @@ func TestCache_Snapshot_Stats(t *testing.T) {
}
// Cached bytes should have been increased.
if got, exp := c.stats.CachedBytes, int64(16)+3; got != exp {
if got, exp := atomic.LoadUint64(&c.cacheTracker.snapshottedBytes), uint64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
if got, exp := c.stats.MemSizeBytes, int64(16)+3; got != exp {
if got, exp := atomic.LoadUint64(&c.cacheTracker.memSizeBytes), uint64(16)+3; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}

View File

@ -501,6 +501,7 @@ func (e *Engine) Open() error {
// Propagate prometheus metrics down into trackers.
e.compactionTracker = newCompactionTracker(e.blockMetrics.compactionMetrics)
e.FileStore.fileTracker = newFileTracker(e.blockMetrics.fileMetrics)
e.Cache.cacheTracker = newCacheTracker(e.blockMetrics.cacheMetrics)
e.scheduler.setCompactionTracker(e.compactionTracker)
@ -554,7 +555,6 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, e.blockMetrics.PrometheusCollectors()...)
// TODO(edd): Add Cache metrics
// TODO(edd): Add WAL metrics
return metrics
}
@ -1161,7 +1161,6 @@ func (e *Engine) WriteSnapshot() error {
log, logEnd := logger.NewOperation(e.logger, "Cache snapshot", "tsm1_cache_snapshot")
defer func() {
elapsed := time.Since(started)
e.Cache.UpdateCompactTime(elapsed)
log.Info("Snapshot for path written",
zap.String("path", e.path),
zap.Duration("duration", elapsed))

View File

@ -241,6 +241,7 @@ func NewFileStore(dir string) *FileStore {
},
obs: noFileStoreObserver{},
parseFileName: DefaultParseFileName,
fileTracker: newFileTracker(newFileMetrics(nil)),
}
fs.purger.fileStore = fs
return fs

View File

@ -10,14 +10,16 @@ import (
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"
const compactionSubsystem = "compactions" // sub-system associated with metrics for compactions
const fileStoreSubsystem = "tsm_files" // sub-system associated with metrics for compactions
const compactionSubsystem = "compactions" // sub-system associated with metrics for compactions.
const fileStoreSubsystem = "tsm_files" // sub-system associated with metrics for TSM files.
const cacheSubsystem = "cache" // sub-system associated with metrics for the cache.
// blockMetrics are a set of metrics concerned with tracking data about block storage.
type blockMetrics struct {
labels prometheus.Labels
*compactionMetrics
*fileMetrics
*cacheMetrics
}
// newBlockMetrics initialises the prometheus metrics for the block subsystem.
@ -26,6 +28,7 @@ func newBlockMetrics(labels prometheus.Labels) *blockMetrics {
labels: labels,
compactionMetrics: newCompactionMetrics(labels),
fileMetrics: newFileMetrics(labels),
cacheMetrics: newCacheMetrics(labels),
}
}
@ -34,16 +37,20 @@ func (m *blockMetrics) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, m.compactionMetrics.PrometheusCollectors()...)
metrics = append(metrics, m.fileMetrics.PrometheusCollectors()...)
metrics = append(metrics, m.cacheMetrics.PrometheusCollectors()...)
return metrics
}
// compactionMetrics are a set of metrics concerned with tracking data about compactions.
type compactionMetrics struct {
labels prometheus.Labels // Read Only
Compactions *prometheus.CounterVec
labels prometheus.Labels // Read Only
CompactionsActive *prometheus.GaugeVec
CompactionDuration *prometheus.HistogramVec
CompactionQueue *prometheus.GaugeVec
// The following metrics include a ``"status" = {ok, error, dropped}` label
Compactions *prometheus.CounterVec
}
// newCompactionMetrics initialises the prometheus metrics for compactions.
@ -155,3 +162,98 @@ func (m *fileMetrics) PrometheusCollectors() []prometheus.Collector {
m.Files,
}
}
// cacheMetrics are a set of metrics concerned with tracking data about the TSM Cache.
type cacheMetrics struct {
labels prometheus.Labels // Read Only
MemSize *prometheus.GaugeVec
DiskSize *prometheus.GaugeVec
SnapshotsActive *prometheus.GaugeVec
Age *prometheus.GaugeVec
SnapshottedBytes *prometheus.CounterVec
// The following metrics include a ``"status" = {ok, error, dropped}` label
WrittenBytes *prometheus.CounterVec
Writes *prometheus.CounterVec
}
// newCacheMetrics initialises the prometheus metrics for compactions.
func newCacheMetrics(labels prometheus.Labels) *cacheMetrics {
var names []string
for k := range labels {
names = append(names, k)
}
sort.Strings(names)
writeNames := append(names, "status")
sort.Strings(writeNames)
return &cacheMetrics{
labels: labels,
MemSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "inuse_bytes",
Help: "In-memory size of cache.",
}, names),
DiskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "disk_bytes",
Help: "Number of bytes on disk used by snapshot data.",
}, names),
SnapshotsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "snapshots_active",
Help: "Number of active concurrent snapshots (>1 when splitting the cache).",
}, names),
Age: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "age",
Help: "Age of the current cache (time since last snapshot or initialisation).",
}, names),
SnapshottedBytes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "snapshot_bytes",
Help: "Number of bytes snapshotted.",
}, names),
WrittenBytes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "written_bytes",
Help: "Number of bytes successfully written to the Cache.",
}, writeNames),
Writes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "writes",
Help: "Number of writes to the Cache.",
}, writeNames),
}
}
// Labels returns a copy of labels for use with cache metrics.
func (m *cacheMetrics) Labels() prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (m *cacheMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
m.MemSize,
m.DiskSize,
m.SnapshotsActive,
m.Age,
m.SnapshottedBytes,
m.WrittenBytes,
m.Writes,
}
}

View File

@ -13,8 +13,9 @@ type scheduler struct {
func newScheduler(maxConcurrency int) *scheduler {
return &scheduler{
maxConcurrency: maxConcurrency,
weights: defaultWeights,
maxConcurrency: maxConcurrency,
weights: defaultWeights,
compactionTracker: newCompactionTracker(newCompactionMetrics(nil)),
}
}

View File

@ -3,7 +3,7 @@ package tsm1
import "testing"
func TestScheduler_Runnable_Empty(t *testing.T) {
s := newScheduler(&EngineStatistics{}, 1)
s := newScheduler(1)
for i := 1; i < 5; i++ {
s.setDepth(i, 1)
@ -20,11 +20,10 @@ func TestScheduler_Runnable_Empty(t *testing.T) {
}
func TestScheduler_Runnable_MaxConcurrency(t *testing.T) {
s := newScheduler(&EngineStatistics{}, 1)
s := newScheduler(1)
// level 1
s.stats = &EngineStatistics{}
s.stats.TSMCompactionsActive[0] = 1
s.compactionTracker.active[1] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
@ -33,8 +32,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) {
}
// level 2
s.stats = &EngineStatistics{}
s.stats.TSMCompactionsActive[1] = 1
s.compactionTracker.active[2] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
@ -43,8 +41,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) {
}
// level 3
s.stats = &EngineStatistics{}
s.stats.TSMCompactionsActive[2] = 1
s.compactionTracker.active[3] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
@ -53,8 +50,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) {
}
// optimize
s.stats = &EngineStatistics{}
s.stats.TSMOptimizeCompactionsActive++
s.compactionTracker.active[4] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {
@ -63,8 +59,7 @@ func TestScheduler_Runnable_MaxConcurrency(t *testing.T) {
}
// full
s.stats = &EngineStatistics{}
s.stats.TSMFullCompactionsActive++
s.compactionTracker.active[5] = 1
for i := 0; i <= 4; i++ {
_, runnable := s.next()
if exp, got := false, runnable; exp != got {