From 97572ee878684b4125ada1d2adf3dfebc11fdd3e Mon Sep 17 00:00:00 2001 From: Tanya Gordeeva Date: Fri, 12 Apr 2019 06:28:12 -0700 Subject: [PATCH] feat(storage): add tsm level metrics Adds prometheus metrics recording compaction levels for TSM files. --- tsdb/tsm1/file_store.go | 76 ++++++++++++++++++++++++++++----------- tsdb/tsm1/metrics.go | 1 + tsdb/tsm1/metrics_test.go | 16 ++++----- 3 files changed, 65 insertions(+), 28 deletions(-) diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 6e70f05dd9..454c0be161 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -303,42 +303,67 @@ type fileTracker struct { metrics *fileMetrics labels prometheus.Labels diskBytes uint64 - fileCount uint64 + levels uint64 } func newFileTracker(metrics *fileMetrics, defaultLabels prometheus.Labels) *fileTracker { return &fileTracker{metrics: metrics, labels: defaultLabels} } +// Labels returns a copy of the default labels used by the tracker's metrics. +// The returned map is safe for modification. func (t *fileTracker) Labels() prometheus.Labels { - return t.labels + labels := make(prometheus.Labels, len(t.labels)) + for k, v := range t.labels { + labels[k] = v + } + return labels } // Bytes returns the number of bytes in use on disk. func (t *fileTracker) Bytes() uint64 { return atomic.LoadUint64(&t.diskBytes) } // SetBytes sets the number of bytes in use on disk. -func (t *fileTracker) SetBytes(bytes uint64) { - atomic.StoreUint64(&t.diskBytes, bytes) - +func (t *fileTracker) SetBytes(bytes map[int]uint64) { + total := uint64(0) labels := t.Labels() - t.metrics.DiskSize.With(labels).Set(float64(bytes)) + for k, v := range bytes { + labels["level"] = fmt.Sprintf("%d", k) + t.metrics.DiskSize.With(labels).Set(float64(v)) + } + atomic.StoreUint64(&t.diskBytes, total) } // AddBytes increases the number of bytes. -func (t *fileTracker) AddBytes(bytes uint64) { +func (t *fileTracker) AddBytes(bytes uint64, level int) { atomic.AddUint64(&t.diskBytes, bytes) labels := t.Labels() + labels["level"] = fmt.Sprintf("%d", level) t.metrics.DiskSize.With(labels).Add(float64(bytes)) } // SetFileCount sets the number of files in the FileStore. -func (t *fileTracker) SetFileCount(files uint64) { - atomic.StoreUint64(&t.fileCount, files) - +func (t *fileTracker) SetFileCount(files map[int]uint64) { labels := t.Labels() - t.metrics.Files.With(labels).Set(float64(files)) + level := uint64(0) + for k, v := range files { + labels["level"] = fmt.Sprintf("%d", k) + if uint64(k) > level { + level = uint64(k) + } + t.metrics.Files.With(labels).Set(float64(v)) + } + atomic.StoreUint64(&t.levels, level) +} + +func (t *fileTracker) ClearFileCounts() { + labels := t.Labels() + for i := uint64(0); i <= atomic.LoadUint64(&t.levels); i++ { + labels["level"] = fmt.Sprintf("%d", i) + t.metrics.Files.With(labels).Set(float64(0)) + } + atomic.StoreUint64(&t.levels, uint64(0)) } // Count returns the number of TSM files currently loaded. @@ -623,6 +648,7 @@ func (f *FileStore) Open(ctx context.Context) error { } var lm int64 + counts := make(map[int]uint64, 5) for range files { res := <-readerC if res.err != nil { @@ -631,13 +657,19 @@ func (f *FileStore) Open(ctx context.Context) error { continue } f.files = append(f.files, res.r) + name := filepath.Base(res.r.Stats().Path) + _, seq, err := f.parseFileName(name) + if err != nil { + return err + } + counts[seq]++ // Accumulate file store size stats totalSize := uint64(res.r.Size()) for _, ts := range res.r.TombstoneFiles() { totalSize += uint64(ts.Size) } - f.tracker.AddBytes(totalSize) + f.tracker.AddBytes(totalSize, seq) // Re-initialize the lastModified time for the file store if res.r.LastModified() > lm { @@ -649,7 +681,7 @@ func (f *FileStore) Open(ctx context.Context) error { close(readerC) sort.Sort(tsmReaders(f.files)) - f.tracker.SetFileCount(uint64(len(f.files))) + f.tracker.SetFileCount(counts) return nil } @@ -662,7 +694,7 @@ func (f *FileStore) Close() error { f.lastFileStats = nil f.files = nil - f.tracker.SetFileCount(uint64(0)) + f.tracker.ClearFileCounts() // Let other methods access this closed object while we do the actual closing. f.mu.Unlock() @@ -948,18 +980,22 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF f.lastFileStats = nil f.files = active sort.Sort(tsmReaders(f.files)) - f.tracker.SetFileCount(uint64(len(f.files))) + f.tracker.ClearFileCounts() // Recalculate the disk size stat - var totalSize uint64 + sizes := make(map[int]uint64, 5) for _, file := range f.files { - totalSize += uint64(file.Size()) + size := uint64(file.Size()) for _, ts := range file.TombstoneFiles() { - totalSize += uint64(ts.Size) + size += uint64(ts.Size) } - + _, seq, err := f.parseFileName(file.Path()) + if err != nil { + return err + } + sizes[seq] += size } - f.tracker.SetBytes(totalSize) + f.tracker.SetBytes(sizes) return nil } diff --git a/tsdb/tsm1/metrics.go b/tsdb/tsm1/metrics.go index fb7de95429..dab8387b5f 100644 --- a/tsdb/tsm1/metrics.go +++ b/tsdb/tsm1/metrics.go @@ -136,6 +136,7 @@ func newFileMetrics(labels prometheus.Labels) *fileMetrics { for k := range labels { names = append(names, k) } + names = append(names, "level") sort.Strings(names) return &fileMetrics{ diff --git a/tsdb/tsm1/metrics_test.go b/tsdb/tsm1/metrics_test.go index 6729affa60..51d871630f 100644 --- a/tsdb/tsm1/metrics_test.go +++ b/tsdb/tsm1/metrics_test.go @@ -18,11 +18,11 @@ func TestMetrics_Filestore(t *testing.T) { reg.MustRegister(metrics.PrometheusCollectors()...) // Generate some measurements. - t1.AddBytes(100) - t1.SetFileCount(3) + t1.AddBytes(100, 0) + t1.SetFileCount(map[int]uint64{0: 3}) - t2.AddBytes(200) - t2.SetFileCount(4) + t2.AddBytes(200, 0) + t2.SetFileCount(map[int]uint64{0: 4}) // Test that all the correct metrics are present. mfs, err := reg.Gather() @@ -31,10 +31,10 @@ func TestMetrics_Filestore(t *testing.T) { } base := namespace + "_" + fileStoreSubsystem + "_" - m1Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "0", "node_id": "0"}) - m2Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "1", "node_id": "0"}) - m1Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "0", "node_id": "0"}) - m2Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "1", "node_id": "0"}) + m1Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "0", "node_id": "0", "level": "0"}) + m2Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "1", "node_id": "0", "level": "0"}) + m1Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "0", "node_id": "0", "level": "0"}) + m2Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "1", "node_id": "0", "level": "0"}) if m, got, exp := m1Bytes, m1Bytes.GetGauge().GetValue(), 100.0; got != exp { t.Errorf("[%s] got %v, expected %v", m, got, exp)