Merge pull request #13353 from influxdata/tg-tsm-metrics
feat(storage): add tsm level metricspull/13314/head
commit
8de4517ed3
|
@ -303,42 +303,67 @@ type fileTracker struct {
|
||||||
metrics *fileMetrics
|
metrics *fileMetrics
|
||||||
labels prometheus.Labels
|
labels prometheus.Labels
|
||||||
diskBytes uint64
|
diskBytes uint64
|
||||||
fileCount uint64
|
levels uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFileTracker(metrics *fileMetrics, defaultLabels prometheus.Labels) *fileTracker {
|
func newFileTracker(metrics *fileMetrics, defaultLabels prometheus.Labels) *fileTracker {
|
||||||
return &fileTracker{metrics: metrics, labels: defaultLabels}
|
return &fileTracker{metrics: metrics, labels: defaultLabels}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Labels returns a copy of the default labels used by the tracker's metrics.
|
||||||
|
// The returned map is safe for modification.
|
||||||
func (t *fileTracker) Labels() prometheus.Labels {
|
func (t *fileTracker) Labels() prometheus.Labels {
|
||||||
return t.labels
|
labels := make(prometheus.Labels, len(t.labels))
|
||||||
|
for k, v := range t.labels {
|
||||||
|
labels[k] = v
|
||||||
|
}
|
||||||
|
return labels
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bytes returns the number of bytes in use on disk.
|
// Bytes returns the number of bytes in use on disk.
|
||||||
func (t *fileTracker) Bytes() uint64 { return atomic.LoadUint64(&t.diskBytes) }
|
func (t *fileTracker) Bytes() uint64 { return atomic.LoadUint64(&t.diskBytes) }
|
||||||
|
|
||||||
// SetBytes sets the number of bytes in use on disk.
|
// SetBytes sets the number of bytes in use on disk.
|
||||||
func (t *fileTracker) SetBytes(bytes uint64) {
|
func (t *fileTracker) SetBytes(bytes map[int]uint64) {
|
||||||
atomic.StoreUint64(&t.diskBytes, bytes)
|
total := uint64(0)
|
||||||
|
|
||||||
labels := t.Labels()
|
labels := t.Labels()
|
||||||
t.metrics.DiskSize.With(labels).Set(float64(bytes))
|
for k, v := range bytes {
|
||||||
|
labels["level"] = fmt.Sprintf("%d", k)
|
||||||
|
t.metrics.DiskSize.With(labels).Set(float64(v))
|
||||||
|
}
|
||||||
|
atomic.StoreUint64(&t.diskBytes, total)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddBytes increases the number of bytes.
|
// AddBytes increases the number of bytes.
|
||||||
func (t *fileTracker) AddBytes(bytes uint64) {
|
func (t *fileTracker) AddBytes(bytes uint64, level int) {
|
||||||
atomic.AddUint64(&t.diskBytes, bytes)
|
atomic.AddUint64(&t.diskBytes, bytes)
|
||||||
|
|
||||||
labels := t.Labels()
|
labels := t.Labels()
|
||||||
|
labels["level"] = fmt.Sprintf("%d", level)
|
||||||
t.metrics.DiskSize.With(labels).Add(float64(bytes))
|
t.metrics.DiskSize.With(labels).Add(float64(bytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetFileCount sets the number of files in the FileStore.
|
// SetFileCount sets the number of files in the FileStore.
|
||||||
func (t *fileTracker) SetFileCount(files uint64) {
|
func (t *fileTracker) SetFileCount(files map[int]uint64) {
|
||||||
atomic.StoreUint64(&t.fileCount, files)
|
|
||||||
|
|
||||||
labels := t.Labels()
|
labels := t.Labels()
|
||||||
t.metrics.Files.With(labels).Set(float64(files))
|
level := uint64(0)
|
||||||
|
for k, v := range files {
|
||||||
|
labels["level"] = fmt.Sprintf("%d", k)
|
||||||
|
if uint64(k) > level {
|
||||||
|
level = uint64(k)
|
||||||
|
}
|
||||||
|
t.metrics.Files.With(labels).Set(float64(v))
|
||||||
|
}
|
||||||
|
atomic.StoreUint64(&t.levels, level)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *fileTracker) ClearFileCounts() {
|
||||||
|
labels := t.Labels()
|
||||||
|
for i := uint64(0); i <= atomic.LoadUint64(&t.levels); i++ {
|
||||||
|
labels["level"] = fmt.Sprintf("%d", i)
|
||||||
|
t.metrics.Files.With(labels).Set(float64(0))
|
||||||
|
}
|
||||||
|
atomic.StoreUint64(&t.levels, uint64(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count returns the number of TSM files currently loaded.
|
// Count returns the number of TSM files currently loaded.
|
||||||
|
@ -623,6 +648,7 @@ func (f *FileStore) Open(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var lm int64
|
var lm int64
|
||||||
|
counts := make(map[int]uint64, 5)
|
||||||
for range files {
|
for range files {
|
||||||
res := <-readerC
|
res := <-readerC
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
|
@ -631,13 +657,19 @@ func (f *FileStore) Open(ctx context.Context) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
f.files = append(f.files, res.r)
|
f.files = append(f.files, res.r)
|
||||||
|
name := filepath.Base(res.r.Stats().Path)
|
||||||
|
_, seq, err := f.parseFileName(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
counts[seq]++
|
||||||
|
|
||||||
// Accumulate file store size stats
|
// Accumulate file store size stats
|
||||||
totalSize := uint64(res.r.Size())
|
totalSize := uint64(res.r.Size())
|
||||||
for _, ts := range res.r.TombstoneFiles() {
|
for _, ts := range res.r.TombstoneFiles() {
|
||||||
totalSize += uint64(ts.Size)
|
totalSize += uint64(ts.Size)
|
||||||
}
|
}
|
||||||
f.tracker.AddBytes(totalSize)
|
f.tracker.AddBytes(totalSize, seq)
|
||||||
|
|
||||||
// Re-initialize the lastModified time for the file store
|
// Re-initialize the lastModified time for the file store
|
||||||
if res.r.LastModified() > lm {
|
if res.r.LastModified() > lm {
|
||||||
|
@ -649,7 +681,7 @@ func (f *FileStore) Open(ctx context.Context) error {
|
||||||
close(readerC)
|
close(readerC)
|
||||||
|
|
||||||
sort.Sort(tsmReaders(f.files))
|
sort.Sort(tsmReaders(f.files))
|
||||||
f.tracker.SetFileCount(uint64(len(f.files)))
|
f.tracker.SetFileCount(counts)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -662,7 +694,7 @@ func (f *FileStore) Close() error {
|
||||||
|
|
||||||
f.lastFileStats = nil
|
f.lastFileStats = nil
|
||||||
f.files = nil
|
f.files = nil
|
||||||
f.tracker.SetFileCount(uint64(0))
|
f.tracker.ClearFileCounts()
|
||||||
|
|
||||||
// Let other methods access this closed object while we do the actual closing.
|
// Let other methods access this closed object while we do the actual closing.
|
||||||
f.mu.Unlock()
|
f.mu.Unlock()
|
||||||
|
@ -948,18 +980,22 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
||||||
f.lastFileStats = nil
|
f.lastFileStats = nil
|
||||||
f.files = active
|
f.files = active
|
||||||
sort.Sort(tsmReaders(f.files))
|
sort.Sort(tsmReaders(f.files))
|
||||||
f.tracker.SetFileCount(uint64(len(f.files)))
|
f.tracker.ClearFileCounts()
|
||||||
|
|
||||||
// Recalculate the disk size stat
|
// Recalculate the disk size stat
|
||||||
var totalSize uint64
|
sizes := make(map[int]uint64, 5)
|
||||||
for _, file := range f.files {
|
for _, file := range f.files {
|
||||||
totalSize += uint64(file.Size())
|
size := uint64(file.Size())
|
||||||
for _, ts := range file.TombstoneFiles() {
|
for _, ts := range file.TombstoneFiles() {
|
||||||
totalSize += uint64(ts.Size)
|
size += uint64(ts.Size)
|
||||||
}
|
}
|
||||||
|
_, seq, err := f.parseFileName(file.Path())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
f.tracker.SetBytes(totalSize)
|
sizes[seq] += size
|
||||||
|
}
|
||||||
|
f.tracker.SetBytes(sizes)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,6 +136,7 @@ func newFileMetrics(labels prometheus.Labels) *fileMetrics {
|
||||||
for k := range labels {
|
for k := range labels {
|
||||||
names = append(names, k)
|
names = append(names, k)
|
||||||
}
|
}
|
||||||
|
names = append(names, "level")
|
||||||
sort.Strings(names)
|
sort.Strings(names)
|
||||||
|
|
||||||
return &fileMetrics{
|
return &fileMetrics{
|
||||||
|
|
|
@ -18,11 +18,11 @@ func TestMetrics_Filestore(t *testing.T) {
|
||||||
reg.MustRegister(metrics.PrometheusCollectors()...)
|
reg.MustRegister(metrics.PrometheusCollectors()...)
|
||||||
|
|
||||||
// Generate some measurements.
|
// Generate some measurements.
|
||||||
t1.AddBytes(100)
|
t1.AddBytes(100, 0)
|
||||||
t1.SetFileCount(3)
|
t1.SetFileCount(map[int]uint64{0: 3})
|
||||||
|
|
||||||
t2.AddBytes(200)
|
t2.AddBytes(200, 0)
|
||||||
t2.SetFileCount(4)
|
t2.SetFileCount(map[int]uint64{0: 4})
|
||||||
|
|
||||||
// Test that all the correct metrics are present.
|
// Test that all the correct metrics are present.
|
||||||
mfs, err := reg.Gather()
|
mfs, err := reg.Gather()
|
||||||
|
@ -31,10 +31,10 @@ func TestMetrics_Filestore(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
base := namespace + "_" + fileStoreSubsystem + "_"
|
base := namespace + "_" + fileStoreSubsystem + "_"
|
||||||
m1Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "0", "node_id": "0"})
|
m1Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "0", "node_id": "0", "level": "0"})
|
||||||
m2Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "1", "node_id": "0"})
|
m2Bytes := promtest.MustFindMetric(t, mfs, base+"disk_bytes", prometheus.Labels{"engine_id": "1", "node_id": "0", "level": "0"})
|
||||||
m1Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "0", "node_id": "0"})
|
m1Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "0", "node_id": "0", "level": "0"})
|
||||||
m2Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "1", "node_id": "0"})
|
m2Files := promtest.MustFindMetric(t, mfs, base+"total", prometheus.Labels{"engine_id": "1", "node_id": "0", "level": "0"})
|
||||||
|
|
||||||
if m, got, exp := m1Bytes, m1Bytes.GetGauge().GetValue(), 100.0; got != exp {
|
if m, got, exp := m1Bytes, m1Bytes.GetGauge().GetValue(), 100.0; got != exp {
|
||||||
t.Errorf("[%s] got %v, expected %v", m, got, exp)
|
t.Errorf("[%s] got %v, expected %v", m, got, exp)
|
||||||
|
|
Loading…
Reference in New Issue