From b970e359dca7f139a43ae2e9ddff189e65b946f9 Mon Sep 17 00:00:00 2001 From: Sam Arnold Date: Thu, 2 Dec 2021 09:01:46 -0500 Subject: [PATCH] feat: remaining storage metrics from OSS engine (#22938) * fix: simplify disk size tracking * refactor: EngineTags in tsdb package * fix: fewer compaction buckets and dead code removal * feat: shard metrics * chore: formatting * feat: tsdb store metrics * feat: retention check metrics * chore: fix go vet * fix: review comments --- cmd/influxd/inspect/build_tsi/build_tsi.go | 2 +- .../inspect/verify_wal/verify_wal_test.go | 3 +- cmd/influxd/launcher/launcher.go | 1 + internal/tsdb_store.go | 4 - storage/engine.go | 24 +- tsdb/engine.go | 1 + tsdb/engine/tsm1/array_cursor_test.go | 13 +- tsdb/engine/tsm1/cache.go | 6 +- tsdb/engine/tsm1/cache_race_test.go | 7 +- tsdb/engine/tsm1/cache_test.go | 51 +-- tsdb/engine/tsm1/compact_test.go | 8 +- tsdb/engine/tsm1/engine.go | 45 +-- tsdb/engine/tsm1/file_store.go | 6 +- tsdb/engine/tsm1/file_store_array_test.go | 2 +- tsdb/engine/tsm1/file_store_test.go | 3 +- tsdb/engine/tsm1/wal.go | 7 +- tsdb/engine/tsm1/wal_test.go | 3 +- tsdb/enginetags.go | 29 ++ tsdb/shard.go | 315 ++++++++++++------ tsdb/store.go | 100 ++++-- v1/services/retention/service.go | 30 ++ 21 files changed, 416 insertions(+), 244 deletions(-) create mode 100644 tsdb/enginetags.go diff --git a/cmd/influxd/inspect/build_tsi/build_tsi.go b/cmd/influxd/inspect/build_tsi/build_tsi.go index e24a65efe5..e835b6770b 100644 --- a/cmd/influxd/inspect/build_tsi/build_tsi.go +++ b/cmd/influxd/inspect/build_tsi/build_tsi.go @@ -441,7 +441,7 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i } } else { log.Debug("Building cache from wal files") - cache := tsm1.NewCache(maxCacheSize, tsm1.EngineTags{}) // tags are for metrics only + cache := tsm1.NewCache(maxCacheSize, tsdb.EngineTags{}) // tags are for metrics only loader := tsm1.NewCacheLoader(walPaths) loader.WithLogger(log) if err := loader.Load(cache); err != nil { diff --git a/cmd/influxd/inspect/verify_wal/verify_wal_test.go b/cmd/influxd/inspect/verify_wal/verify_wal_test.go index 5aaf0cc4e5..0b231290e0 100644 --- a/cmd/influxd/inspect/verify_wal/verify_wal_test.go +++ b/cmd/influxd/inspect/verify_wal/verify_wal_test.go @@ -7,6 +7,7 @@ import ( "os" "testing" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" "github.com/stretchr/testify/require" ) @@ -110,7 +111,7 @@ func newTempWALValid(t *testing.T) string { dir, err := os.MkdirTemp("", "verify-wal") require.NoError(t, err) - w := tsm1.NewWAL(dir, 0, 0, tsm1.EngineTags{}) + w := tsm1.NewWAL(dir, 0, 0, tsdb.EngineTags{}) defer w.Close() require.NoError(t, w.Open()) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index facfe9e418..1f64c97721 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -324,6 +324,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { m.engine = storage.NewEngine( opts.EnginePath, opts.StorageConfig, + storage.WithMetricsDisabled(opts.MetricsDisabled), storage.WithMetaClient(metaClient), ) } diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index 4da03ee7e5..8e2f13d8b4 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -29,7 +29,6 @@ type TSDBStoreMock struct { DiskSizeFn func() (int64, error) ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) ImportShardFn func(id uint64, r io.Reader) error - MeasurementSeriesCountsFn func(database string) (measurements int, series int) MeasurementsCardinalityFn func(database string) (int64, error) MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) OpenFn func() error @@ -95,9 +94,6 @@ func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error { func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { return s.MeasurementNamesFn(ctx, auth, database, cond) } -func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measurements int, series int) { - return s.MeasurementSeriesCountsFn(database) -} func (s *TSDBStoreMock) MeasurementsCardinality(database string) (int64, error) { return s.MeasurementsCardinalityFn(database) } diff --git a/storage/engine.go b/storage/engine.go index 5415680cca..80546c802b 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -54,11 +54,10 @@ type Engine struct { retentionService *retention.Service precreatorService *precreator.Service - defaultMetricLabels prometheus.Labels - writePointsValidationEnabled bool - logger *zap.Logger + logger *zap.Logger + metricsDisabled bool } // Option provides a set @@ -70,6 +69,12 @@ func WithMetaClient(c MetaClient) Option { } } +func WithMetricsDisabled(m bool) Option { + return func(e *Engine) { + e.metricsDisabled = m + } +} + type MetaClient interface { CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) @@ -109,11 +114,10 @@ func NewEngine(path string, c Config, options ...Option) *Engine { c.Data.WALDir = filepath.Join(path, "wal") e := &Engine{ - config: c, - path: path, - defaultMetricLabels: prometheus.Labels{}, - tsdbStore: tsdb.NewStore(c.Data.Dir), - logger: zap.NewNop(), + config: c, + path: path, + tsdbStore: tsdb.NewStore(c.Data.Dir), + logger: zap.NewNop(), writePointsValidationEnabled: true, } @@ -127,6 +131,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine { // Copy TSDB configuration. e.tsdbStore.EngineOptions.EngineVersion = c.Data.Engine e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index + e.tsdbStore.EngineOptions.MetricsDisabled = e.metricsDisabled pw := coordinator.NewPointsWriter(c.WriteTimeout, path) pw.TSDBStore = e.tsdbStore @@ -167,6 +172,9 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector { var metrics []prometheus.Collector metrics = append(metrics, tsm1.PrometheusCollectors()...) metrics = append(metrics, coordinator.PrometheusCollectors()...) + metrics = append(metrics, tsdb.ShardCollectors()...) + metrics = append(metrics, tsdb.BucketCollectors()...) + metrics = append(metrics, retention.PrometheusCollectors()...) return metrics } diff --git a/tsdb/engine.go b/tsdb/engine.go index 004e1d362d..8d87955840 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -184,6 +184,7 @@ type EngineOptions struct { OnNewEngine func(Engine) FileStoreObserver FileStoreObserver + MetricsDisabled bool } // NewEngineOptions constructs an EngineOptions object with safe default values. diff --git a/tsdb/engine/tsm1/array_cursor_test.go b/tsdb/engine/tsm1/array_cursor_test.go index 722c4144ac..28415bdd60 100644 --- a/tsdb/engine/tsm1/array_cursor_test.go +++ b/tsdb/engine/tsm1/array_cursor_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/v2/pkg/fs" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/cursors" "github.com/stretchr/testify/assert" ) @@ -73,7 +74,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { t.Run("cache", func(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - fs := NewFileStore(dir, EngineTags{}) + fs := NewFileStore(dir, tsdb.EngineTags{}) const START, END = 10, 1 kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) @@ -96,7 +97,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { t.Run("tsm", func(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - fs := NewFileStore(dir, EngineTags{}) + fs := NewFileStore(dir, tsdb.EngineTags{}) const START, END = 10, 1 @@ -133,7 +134,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { func TestFileStore_DuplicatePoints(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - fs := NewFileStore(dir, EngineTags{}) + fs := NewFileStore(dir, tsdb.EngineTags{}) makeVals := func(ts ...int64) []Value { vals := make([]Value, len(ts)) @@ -218,7 +219,7 @@ func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - fs := NewFileStore(dir, EngineTags{}) + fs := NewFileStore(dir, tsdb.EngineTags{}) // makeVals creates count points starting at ts and incrementing by step makeVals := func(ts, count, step int64) []Value { @@ -320,7 +321,7 @@ func (a *FloatArray) Swap(i, j int) { func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - fs := NewFileStore(dir, EngineTags{}) + fs := NewFileStore(dir, tsdb.EngineTags{}) // makeVals creates count points starting at ts and incrementing by step makeVals := func(ts, count, step int64, v float64) []Value { @@ -414,7 +415,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing func TestFileStore_SeekBoundaries(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - fs := NewFileStore(dir, EngineTags{}) + fs := NewFileStore(dir, tsdb.EngineTags{}) // makeVals creates count points starting at ts and incrementing by step makeVals := func(ts, count, step int64, v float64) []Value { diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 9d45588918..8c3a5d3769 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -190,7 +190,7 @@ type Cache struct { // NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. // Only used for engine caches, never for snapshots. // Note tags are for metrics only, so if metrics are not desired tags do not have to be set. -func NewCache(maxSize uint64, tags EngineTags) *Cache { +func NewCache(maxSize uint64, tags tsdb.EngineTags) *Cache { c := &Cache{ maxSize: maxSize, store: emptyStore{}, @@ -224,7 +224,7 @@ type cacheMetrics struct { } func newAllCacheMetrics() *allCacheMetrics { - labels := EngineLabelNames() + labels := tsdb.EngineLabelNames() return &allCacheMetrics{ MemBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: storageNamespace, @@ -276,7 +276,7 @@ func CacheCollectors() []prometheus.Collector { } } -func newCacheMetrics(tags EngineTags) *cacheMetrics { +func newCacheMetrics(tags tsdb.EngineTags) *cacheMetrics { labels := tags.GetLabels() return &cacheMetrics{ MemBytes: globalCacheMetrics.MemBytes.With(labels), diff --git a/tsdb/engine/tsm1/cache_race_test.go b/tsdb/engine/tsm1/cache_race_test.go index 6e6830424e..651d8381ff 100644 --- a/tsdb/engine/tsm1/cache_race_test.go +++ b/tsdb/engine/tsm1/cache_race_test.go @@ -6,6 +6,7 @@ import ( "sync" "testing" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" ) @@ -26,7 +27,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) { } wg := sync.WaitGroup{} - c := tsm1.NewCache(1000000, tsm1.EngineTags{}) + c := tsm1.NewCache(1000000, tsdb.EngineTags{}) ch := make(chan struct{}) for _, s := range series { @@ -71,7 +72,7 @@ func TestCacheRace(t *testing.T) { } wg := sync.WaitGroup{} - c := tsm1.NewCache(1000000, tsm1.EngineTags{}) + c := tsm1.NewCache(1000000, tsdb.EngineTags{}) ch := make(chan struct{}) for _, s := range series { @@ -136,7 +137,7 @@ func TestCacheRace2Compacters(t *testing.T) { } wg := sync.WaitGroup{} - c := tsm1.NewCache(1000000, tsm1.EngineTags{}) + c := tsm1.NewCache(1000000, tsdb.EngineTags{}) ch := make(chan struct{}) for _, s := range series { diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 3dafba46e0..aae854c1d7 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/golang/snappy" + "github.com/influxdata/influxdb/v2/tsdb" ) // Convenience method for testing. @@ -23,7 +24,7 @@ func (c *Cache) Write(key []byte, values []Value) error { } func TestCache_NewCache(t *testing.T) { - c := NewCache(100, EngineTags{}) + c := NewCache(100, tsdb.EngineTags{}) if c == nil { t.Fatalf("failed to create new cache") } @@ -46,7 +47,7 @@ func TestCache_CacheWriteMulti(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(30*valuesSize, EngineTags{}) + c := NewCache(30*valuesSize, tsdb.EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -63,7 +64,7 @@ func TestCache_CacheWriteMulti(t *testing.T) { // Tests that the cache stats and size are correctly maintained during writes. func TestCache_WriteMulti_Stats(t *testing.T) { limit := uint64(1) - c := NewCache(limit, EngineTags{}) + c := NewCache(limit, tsdb.EngineTags{}) ms := NewTestStore() c.store = ms @@ -75,7 +76,7 @@ func TestCache_WriteMulti_Stats(t *testing.T) { } // Fail one of the values in the write. - c = NewCache(50, EngineTags{}) + c = NewCache(50, tsdb.EngineTags{}) c.init() c.store = ms @@ -104,7 +105,7 @@ func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3*valuesSize, EngineTags{}) + c := NewCache(3*valuesSize, tsdb.EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil { t.Fatalf(" expected field type conflict") @@ -126,7 +127,7 @@ func TestCache_Cache_DeleteRange(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(30*valuesSize, EngineTags{}) + c := NewCache(30*valuesSize, tsdb.EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -165,7 +166,7 @@ func TestCache_DeleteRange_NoValues(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3*valuesSize, EngineTags{}) + c := NewCache(3*valuesSize, tsdb.EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -200,7 +201,7 @@ func TestCache_DeleteRange_NotSorted(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(3*valuesSize, EngineTags{}) + c := NewCache(3*valuesSize, tsdb.EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -235,7 +236,7 @@ func TestCache_Cache_Delete(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(30*valuesSize, EngineTags{}) + c := NewCache(30*valuesSize, tsdb.EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -268,7 +269,7 @@ func TestCache_Cache_Delete(t *testing.T) { } func TestCache_Cache_Delete_NonExistent(t *testing.T) { - c := NewCache(1024, EngineTags{}) + c := NewCache(1024, tsdb.EngineTags{}) c.Delete([][]byte{[]byte("bar")}) @@ -289,7 +290,7 @@ func TestCache_CacheWriteMulti_Duplicates(t *testing.T) { v5 := NewValue(5, 3.0) values1 := Values{v3, v4, v5} - c := NewCache(0, EngineTags{}) + c := NewCache(0, tsdb.EngineTags{}) if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -319,7 +320,7 @@ func TestCache_CacheValues(t *testing.T) { v3 := NewValue(1, 1.0) v4 := NewValue(4, 4.0) - c := NewCache(512, EngineTags{}) + c := NewCache(512, tsdb.EngineTags{}) if deduped := c.Values([]byte("no such key")); deduped != nil { t.Fatalf("Values returned for no such key") } @@ -347,7 +348,7 @@ func TestCache_CacheSnapshot(t *testing.T) { v6 := NewValue(7, 5.0) v7 := NewValue(2, 5.0) - c := NewCache(512, EngineTags{}) + c := NewCache(512, tsdb.EngineTags{}) if err := c.Write([]byte("foo"), Values{v0, v1, v2, v3}); err != nil { t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error()) } @@ -424,7 +425,7 @@ func TestCache_CacheSnapshot(t *testing.T) { // Tests that Snapshot updates statistics correctly. func TestCache_Snapshot_Stats(t *testing.T) { limit := uint64(16) - c := NewCache(limit, EngineTags{}) + c := NewCache(limit, tsdb.EngineTags{}) values := map[string][]Value{"foo": {NewValue(1, 1.0)}} if err := c.WriteMulti(values); err != nil { @@ -443,7 +444,7 @@ func TestCache_Snapshot_Stats(t *testing.T) { } func TestCache_CacheEmptySnapshot(t *testing.T) { - c := NewCache(512, EngineTags{}) + c := NewCache(512, tsdb.EngineTags{}) // Grab snapshot, and ensure it's as expected. snapshot, err := c.Snapshot() @@ -470,7 +471,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { v0 := NewValue(1, 1.0) v1 := NewValue(2, 2.0) - c := NewCache(uint64(v1.Size()), EngineTags{}) + c := NewCache(uint64(v1.Size()), tsdb.EngineTags{}) if err := c.Write([]byte("foo"), Values{v0}); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -516,7 +517,7 @@ func TestCache_Deduplicate_Concurrent(t *testing.T) { } wg := sync.WaitGroup{} - c := NewCache(1000000, EngineTags{}) + c := NewCache(1000000, tsdb.EngineTags{}) wg.Add(1) go func() { @@ -568,7 +569,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Load the cache using the segment. - cache := NewCache(1024, EngineTags{}) + cache := NewCache(1024, tsdb.EngineTags{}) loader := NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -591,7 +592,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) { } // Reload the cache using the segment. - cache = NewCache(1024, EngineTags{}) + cache = NewCache(1024, tsdb.EngineTags{}) loader = NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -653,7 +654,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) { } // Load the cache using the segments. - cache := NewCache(1024, EngineTags{}) + cache := NewCache(1024, tsdb.EngineTags{}) loader := NewCacheLoader([]string{f1.Name(), f2.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -717,7 +718,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) { } // Load the cache using the segment. - cache := NewCache(1024, EngineTags{}) + cache := NewCache(1024, tsdb.EngineTags{}) loader := NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -729,7 +730,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) { } // Reload the cache using the segment. - cache = NewCache(1024, EngineTags{}) + cache = NewCache(1024, tsdb.EngineTags{}) loader = NewCacheLoader([]string{f.Name()}) if err := loader.Load(cache); err != nil { t.Fatalf("failed to load cache: %s", err.Error()) @@ -748,7 +749,7 @@ func TestCache_Split(t *testing.T) { values := Values{v0, v1, v2} valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) - c := NewCache(0, EngineTags{}) + c := NewCache(0, tsdb.EngineTags{}) if err := c.Write([]byte("foo"), values); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -835,7 +836,7 @@ func (s *TestStore) count() int { return s.c var fvSize = uint64(NewValue(1, float64(1)).Size()) func BenchmarkCacheFloatEntries(b *testing.B) { - cache := NewCache(uint64(b.N)*fvSize, EngineTags{}) + cache := NewCache(uint64(b.N)*fvSize, tsdb.EngineTags{}) vals := make([][]Value, b.N) for i := 0; i < b.N; i++ { vals[i] = []Value{NewValue(1, float64(i))} @@ -856,7 +857,7 @@ type points struct { func BenchmarkCacheParallelFloatEntries(b *testing.B) { c := b.N * runtime.GOMAXPROCS(0) - cache := NewCache(uint64(c)*fvSize*10, EngineTags{}) + cache := NewCache(uint64(c)*fvSize*10, tsdb.EngineTags{}) vals := make([]points, c) for i := 0; i < c; i++ { v := make([]Value, 10) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index f62358aaf1..9c3658c633 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -29,7 +29,7 @@ func TestCompactor_Snapshot(t *testing.T) { "cpu,host=B#!~#value": {v2, v3}, } - c := tsm1.NewCache(0, tsm1.EngineTags{}) + c := tsm1.NewCache(0, tsdb.EngineTags{}) for k, v := range points1 { if err := c.Write([]byte(k), v); err != nil { t.Fatalf("failed to write key foo to cache: %s", err.Error()) @@ -1386,7 +1386,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { "cpu,host=A#!~#value": {v0}, } - c := tsm1.NewCache(0, tsm1.EngineTags{}) + c := tsm1.NewCache(0, tsdb.EngineTags{}) for k, v := range writes { if err := c.Write([]byte(k), v); err != nil { @@ -1434,7 +1434,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { "cpu,host=A#!~#value": {v0, v1}, } - c := tsm1.NewCache(0, tsm1.EngineTags{}) + c := tsm1.NewCache(0, tsdb.EngineTags{}) for k, v := range writes { if err := c.Write([]byte(k), v); err != nil { @@ -1484,7 +1484,7 @@ func TestCacheKeyIterator_Abort(t *testing.T) { "cpu,host=A#!~#value": {v0}, } - c := tsm1.NewCache(0, tsm1.EngineTags{}) + c := tsm1.NewCache(0, tsdb.EngineTags{}) for k, v := range writes { if err := c.Write([]byte(k), v); err != nil { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 64acebebab..e62c19cbfc 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -158,12 +158,12 @@ type Engine struct { // NewEngine returns a new instance of Engine. func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine { - etags := EngineTags{ - path: path, - walPath: walPath, - id: fmt.Sprintf("%d", id), - bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db - engineVersion: opt.EngineVersion, + etags := tsdb.EngineTags{ + Path: path, + WalPath: walPath, + Id: fmt.Sprintf("%d", id), + Bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db + EngineVersion: opt.EngineVersion, } var wal *WAL @@ -590,7 +590,7 @@ func (e *Engine) LastModified() time.Time { return fsTime } -var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(EngineLabelNames()) +var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(tsdb.EngineLabelNames()) // PrometheusCollectors returns all prometheus metrics for the tsm1 package. func PrometheusCollectors() []prometheus.Collector { @@ -638,7 +638,8 @@ func newAllCompactionMetrics(labelNames []string) *compactionMetrics { Subsystem: engineSubsystem, Name: "duration_seconds", Help: "Histogram of compactions by level since startup", - Buckets: []float64{0.1, 1, 10, 100, 1000, 10000, 100000}, + // 10 minute compactions seem normal, 1h40min is high + Buckets: []float64{60, 600, 6000}, }, labelNamesWithLevel), Active: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: storageNamespace, @@ -689,33 +690,7 @@ type compactionMetrics struct { Failed *prometheus.CounterVec } -// EngineTags holds tags for prometheus -// -// It should not be used for behaviour other than attaching tags to prometheus metrics -type EngineTags struct { - path, walPath, id, bucket, engineVersion string -} - -func (et *EngineTags) GetLabels() prometheus.Labels { - return prometheus.Labels{ - "path": et.path, - "walPath": et.walPath, - "id": et.id, - "bucket": et.bucket, - "engine": et.engineVersion, - } -} - -func EngineLabelNames() []string { - emptyLabels := (&EngineTags{}).GetLabels() - val := make([]string, 0, len(emptyLabels)) - for k := range emptyLabels { - val = append(val, k) - } - return val -} - -func newEngineMetrics(tags EngineTags) *compactionMetrics { +func newEngineMetrics(tags tsdb.EngineTags) *compactionMetrics { engineLabels := tags.GetLabels() return &compactionMetrics{ Duration: globalCompactionMetrics.Duration.MustCurryWith(engineLabels), diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 1a5141b99c..765002422d 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -226,7 +226,7 @@ func (f FileStat) ContainsKey(key []byte) bool { } // NewFileStore returns a new instance of FileStore based on the given directory. -func NewFileStore(dir string, tags EngineTags) *FileStore { +func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore { logger := zap.NewNop() fs := &FileStore{ dir: dir, @@ -308,7 +308,7 @@ func (f *fileStoreMetrics) SetFiles(n int64) { } func newAllFileStoreMetrics() *allFileStoreMetrics { - labels := EngineLabelNames() + labels := tsdb.EngineLabelNames() return &allFileStoreMetrics{ files: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: storageNamespace, @@ -332,7 +332,7 @@ func FileStoreCollectors() []prometheus.Collector { } } -func newFileStoreMetrics(tags EngineTags) *fileStoreMetrics { +func newFileStoreMetrics(tags tsdb.EngineTags) *fileStoreMetrics { labels := tags.GetLabels() return &fileStoreMetrics{ files: globalFileStoreMetrics.files.With(labels), diff --git a/tsdb/engine/tsm1/file_store_array_test.go b/tsdb/engine/tsm1/file_store_array_test.go index 56f5d77ded..691cf6aea3 100644 --- a/tsdb/engine/tsm1/file_store_array_test.go +++ b/tsdb/engine/tsm1/file_store_array_test.go @@ -312,7 +312,7 @@ func TestFileStore_Array(t *testing.T) { t.Run(tc.name, func(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - fs := tsm1.NewFileStore(dir, tsm1.EngineTags{}) + fs := tsm1.NewFileStore(dir, tsdb.EngineTags{}) files, err := newFiles(dir, tc.data...) if err != nil { diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 0b2959efed..b1041fcf6b 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" "go.uber.org/zap/zaptest" ) @@ -2749,7 +2750,7 @@ func TestFileStore_CreateSnapshot(t *testing.T) { } func newTestFileStore(dir string) *tsm1.FileStore { - return tsm1.NewFileStore(dir, tsm1.EngineTags{}) + return tsm1.NewFileStore(dir, tsdb.EngineTags{}) } type mockObserver struct { diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 5f10c1f9ea..0199744fc5 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -21,6 +21,7 @@ import ( "github.com/golang/snappy" "github.com/influxdata/influxdb/v2/pkg/limiter" "github.com/influxdata/influxdb/v2/pkg/pool" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -116,7 +117,7 @@ type WAL struct { } // NewWAL initializes a new WAL at the given directory. -func NewWAL(path string, maxConcurrentWrites int, maxWriteDelay time.Duration, tags EngineTags) *WAL { +func NewWAL(path string, maxConcurrentWrites int, maxWriteDelay time.Duration, tags tsdb.EngineTags) *WAL { logger := zap.NewNop() if maxConcurrentWrites == 0 { maxConcurrentWrites = defaultWaitingWALWrites @@ -184,7 +185,7 @@ func (f *walMetrics) SetSize(n int64) { } func newAllWALMetrics() *allWALMetrics { - labels := EngineLabelNames() + labels := tsdb.EngineLabelNames() return &allWALMetrics{ size: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: storageNamespace, @@ -215,7 +216,7 @@ func WALCollectors() []prometheus.Collector { } } -func newWALMetrics(tags EngineTags) *walMetrics { +func newWALMetrics(tags tsdb.EngineTags) *walMetrics { labels := tags.GetLabels() return &walMetrics{ size: globalWALMetrics.size.With(labels), diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go index ed7437d9da..5e54376a3b 100644 --- a/tsdb/engine/tsm1/wal_test.go +++ b/tsdb/engine/tsm1/wal_test.go @@ -14,13 +14,14 @@ import ( "github.com/golang/snappy" "github.com/influxdata/influxdb/v2/pkg/slices" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" "github.com/stretchr/testify/require" ) func NewWAL(path string, maxConcurrentWrites int, maxWriteDelay time.Duration) *tsm1.WAL { // EngineTags is only for metrics, not needed for tests - return tsm1.NewWAL(path, maxConcurrentWrites, maxWriteDelay, tsm1.EngineTags{}) + return tsm1.NewWAL(path, maxConcurrentWrites, maxWriteDelay, tsdb.EngineTags{}) } func TestWALWriter_WriteMulti_Single(t *testing.T) { diff --git a/tsdb/enginetags.go b/tsdb/enginetags.go new file mode 100644 index 0000000000..d141ee815b --- /dev/null +++ b/tsdb/enginetags.go @@ -0,0 +1,29 @@ +package tsdb + +import "github.com/prometheus/client_golang/prometheus" + +// EngineTags holds tags for prometheus +// +// It should not be used for behaviour other than attaching tags to prometheus metrics +type EngineTags struct { + Path, WalPath, Id, Bucket, EngineVersion string +} + +func (et *EngineTags) GetLabels() prometheus.Labels { + return prometheus.Labels{ + "path": et.Path, + "walPath": et.WalPath, + "id": et.Id, + "bucket": et.Bucket, + "engine": et.EngineVersion, + } +} + +func EngineLabelNames() []string { + emptyLabels := (&EngineTags{}).GetLabels() + val := make([]string, 0, len(emptyLabels)) + for k := range emptyLabels { + val = append(val, k) + } + return val +} diff --git a/tsdb/shard.go b/tsdb/shard.go index d85c20b098..ddc475a499 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -27,38 +27,21 @@ import ( "github.com/influxdata/influxdb/v2/pkg/slices" internal "github.com/influxdata/influxdb/v2/tsdb/internal" "github.com/influxdata/influxql" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) const ( - statWriteReq = "writeReq" - statWriteReqOK = "writeReqOk" - statWriteReqErr = "writeReqErr" - statSeriesCreate = "seriesCreate" - statFieldsCreate = "fieldsCreate" - statWritePointsErr = "writePointsErr" - statWritePointsDropped = "writePointsDropped" - statWritePointsOK = "writePointsOk" - statWriteBytes = "writeBytes" - statDiskBytes = "diskBytes" - measurementKey = "_name" + measurementKey = "_name" + DefaultMetricInterval = 10 * time.Second ) var ( - // ErrFieldOverflow is returned when too many fields are created on a measurement. - ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") - // ErrFieldNotFound is returned when a field cannot be found. - ErrFieldNotFound = errors.New("field not found") - - // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID - // there is no mapping for. - ErrFieldUnmappedID = errors.New("field ID not mapped") - // ErrEngineClosed is returned when a caller attempts indirectly to // access the shard's underlying engine. ErrEngineClosed = errors.New("engine is closed") @@ -141,13 +124,13 @@ type Shard struct { index Index enabled bool - // expvar-based stats. - stats *ShardStatistics - defaultTags models.StatisticTags + stats *ShardMetrics baseLogger *zap.Logger logger *zap.Logger + metricUpdater *ticker + EnableOnOpen bool // CompactionDisabled specifies the shard should not schedule compactions. @@ -160,29 +143,26 @@ func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt Eng db, rp := decodeStorePath(path) logger := zap.NewNop() + engineTags := EngineTags{ + Path: path, + WalPath: walPath, + Id: fmt.Sprintf("%d", id), + Bucket: db, + EngineVersion: opt.EngineVersion, + } + s := &Shard{ - id: id, - path: path, - walPath: walPath, - sfile: sfile, - options: opt, - - stats: &ShardStatistics{}, - defaultTags: models.StatisticTags{ - "path": path, - "walPath": walPath, - "id": fmt.Sprintf("%d", id), - "database": db, - "retentionPolicy": rp, - "engine": opt.EngineVersion, - }, - + id: id, + path: path, + walPath: walPath, + sfile: sfile, + options: opt, + stats: newShardMetrics(engineTags), database: db, retentionPolicy: rp, - - logger: logger, - baseLogger: logger, - EnableOnOpen: true, + logger: logger, + baseLogger: logger, + EnableOnOpen: true, } return s } @@ -235,58 +215,140 @@ func (s *Shard) RetentionPolicy() string { return s.retentionPolicy } -// ShardStatistics maintains statistics for a shard. -type ShardStatistics struct { - WriteReq int64 - WriteReqOK int64 - WriteReqErr int64 - FieldsCreated int64 - WritePointsErr int64 - WritePointsDropped int64 - WritePointsOK int64 - BytesWritten int64 - DiskBytes int64 +var globalShardMetrics = newAllShardMetrics() + +type twoCounterObserver struct { + count prometheus.Counter + sum prometheus.Counter } -// Statistics returns statistics for periodic monitoring. -func (s *Shard) Statistics(tags map[string]string) []models.Statistic { - engine, err := s.Engine() - if err != nil { - return nil +func (t twoCounterObserver) Observe(f float64) { + t.sum.Inc() + t.count.Add(f) +} + +var _ prometheus.Observer = twoCounterObserver{} + +type allShardMetrics struct { + writes *prometheus.CounterVec + writesSum *prometheus.CounterVec + writesErr *prometheus.CounterVec + writesErrSum *prometheus.CounterVec + writesDropped *prometheus.CounterVec + fieldsCreated *prometheus.CounterVec + diskSize *prometheus.GaugeVec + series *prometheus.GaugeVec +} + +type ShardMetrics struct { + writes prometheus.Observer + writesErr prometheus.Observer + writesDropped prometheus.Counter + fieldsCreated prometheus.Counter + diskSize prometheus.Gauge + series prometheus.Gauge +} + +const storageNamespace = "storage" +const shardSubsystem = "shard" + +func newAllShardMetrics() *allShardMetrics { + labels := EngineLabelNames() + return &allShardMetrics{ + writes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "write_count", + Help: "Count of the number of write requests", + }, labels), + writesSum: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "write_sum", + Help: "Counter of the number of points for write requests", + }, labels), + writesErr: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "write_err_count", + Help: "Count of the number of write requests with errors", + }, labels), + writesErrSum: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "write_err_sum", + Help: "Counter of the number of points for write requests with errors", + }, labels), + writesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "write_dropped_sum", + Help: "Counter of the number of points droppped", + }, labels), + fieldsCreated: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "fields_created", + Help: "Counter of the number of fields created", + }, labels), + diskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "disk_size", + Help: "Gauge of the disk size for the shard", + }, labels), + series: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: shardSubsystem, + Name: "series", + Help: "Gauge of the number of series in the shard index", + }, labels), } +} - // Refresh our disk size stat - if _, err := s.DiskSize(); err != nil { - return nil +func ShardCollectors() []prometheus.Collector { + return []prometheus.Collector{ + globalShardMetrics.writes, + globalShardMetrics.writesSum, + globalShardMetrics.writesErr, + globalShardMetrics.writesErrSum, + globalShardMetrics.writesDropped, + globalShardMetrics.fieldsCreated, + globalShardMetrics.diskSize, + globalShardMetrics.series, } - seriesN := engine.SeriesN() +} - tags = s.defaultTags.Merge(tags) - - // Set the index type on the tags. N.B this needs to be checked since it's - // only set when the shard is opened. - if indexType := s.IndexType(); indexType != "" { - tags["indexType"] = indexType - } - - statistics := []models.Statistic{{ - Name: "shard", - Tags: tags, - Values: map[string]interface{}{ - statWriteReq: atomic.LoadInt64(&s.stats.WriteReq), - statWriteReqOK: atomic.LoadInt64(&s.stats.WriteReqOK), - statWriteReqErr: atomic.LoadInt64(&s.stats.WriteReqErr), - statSeriesCreate: seriesN, - statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated), - statWritePointsErr: atomic.LoadInt64(&s.stats.WritePointsErr), - statWritePointsDropped: atomic.LoadInt64(&s.stats.WritePointsDropped), - statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK), - statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten), - statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes), +func newShardMetrics(tags EngineTags) *ShardMetrics { + labels := tags.GetLabels() + return &ShardMetrics{ + writes: twoCounterObserver{ + count: globalShardMetrics.writes.With(labels), + sum: globalShardMetrics.writesSum.With(labels), }, - }} + writesErr: twoCounterObserver{ + count: globalShardMetrics.writesErr.With(labels), + sum: globalShardMetrics.writesErrSum.With(labels), + }, + writesDropped: globalShardMetrics.writesDropped.With(labels), + fieldsCreated: globalShardMetrics.fieldsCreated.With(labels), + diskSize: globalShardMetrics.diskSize.With(labels), + series: globalShardMetrics.series.With(labels), + } +} - return statistics +// ticker runs fn periodically, and stops when Stop() is called +// +// Stop waits for the last function run to finish if already running +type ticker struct { + wg sync.WaitGroup + closing chan struct{} +} + +// Stops the ticker and waits for the function to complete +func (t *ticker) Stop() { + close(t.closing) + t.wg.Wait() } // Path returns the path set on the shard when it was created. @@ -353,6 +415,39 @@ func (s *Shard) Open(ctx context.Context) error { } s._engine = e + // Set up metric collection + metricUpdater := &ticker{ + closing: make(chan struct{}), + } + + // We want a way to turn off the series and disk size metrics if they are suspected to cause issues + // This corresponds to the top-level MetricsDisabled argument + if !s.options.MetricsDisabled { + metricUpdater.wg.Add(1) + go func() { + tick := time.NewTicker(DefaultMetricInterval) + defer metricUpdater.wg.Done() + defer tick.Stop() + for { + select { + case <-tick.C: + // Note this takes the engine lock, so we have to be careful not + // to close metricUpdater.closing while holding the engine lock + e, err := s.Engine() + if err != nil { + continue + } + s.stats.series.Set(float64(e.SeriesN())) + s.stats.diskSize.Set(float64(e.DiskSize())) + case <-metricUpdater.closing: + return + } + } + }() + } + + s.metricUpdater = metricUpdater + return nil }(); err != nil { s.close() @@ -369,9 +464,14 @@ func (s *Shard) Open(ctx context.Context) error { // Close shuts down the shard's store. func (s *Shard) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - return s.close() + err := func() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.close() + }() + // make sure not to hold a lock while waiting for the metric updater to finish + s.metricUpdater.wg.Wait() + return err } // close closes the shard an removes reference to the shard from associated @@ -381,6 +481,10 @@ func (s *Shard) close() error { return nil } + if s.metricUpdater != nil { + close(s.metricUpdater.closing) + } + err := s._engine.Close() if err == nil { s._engine = nil @@ -488,7 +592,6 @@ func (s *Shard) DiskSize() (int64, error) { return 0, ErrEngineClosed } size := s._engine.DiskSize() - atomic.StoreInt64(&s.stats.DiskBytes, size) return size, nil } @@ -499,7 +602,7 @@ type FieldCreate struct { } // WritePoints will write the raw data points and any new metadata to the index in the shard. -func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error { +func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr error) { s.mu.RLock() defer s.mu.RUnlock() @@ -509,7 +612,12 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error { } var writeError error - atomic.AddInt64(&s.stats.WriteReq, 1) + s.stats.writes.Observe(float64(len(points))) + defer func() { + if rErr != nil { + s.stats.writesErr.Observe(float64(len(points))) + } + }() points, fieldsToCreate, err := s.validateSeriesAndFields(points) if err != nil { @@ -520,7 +628,7 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error { // to the caller, but continue on writing the remaining points. writeError = err } - atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate))) + s.stats.fieldsCreated.Add(float64(len(fieldsToCreate))) // add any new fields and keep track of what needs to be saved if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil { @@ -529,12 +637,8 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error { // Write to the engine. if err := engine.WritePoints(ctx, points); err != nil { - atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points))) - atomic.AddInt64(&s.stats.WriteReqErr, 1) return fmt.Errorf("engine: %s", err) } - atomic.AddInt64(&s.stats.WritePointsOK, int64(len(points))) - atomic.AddInt64(&s.stats.WriteReqOK, 1) return writeError } @@ -604,7 +708,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, reason = err.Reason dropped += err.Dropped droppedKeys = err.DroppedKeys - atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) + s.stats.writesDropped.Add(float64(err.Dropped)) default: return nil, nil, err } @@ -648,7 +752,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, reason = err.Reason } dropped += err.Dropped - atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) + s.stats.writesDropped.Add(float64(err.Dropped)) default: return nil, nil, err } @@ -813,17 +917,6 @@ func (s *Shard) MeasurementExists(name []byte) (bool, error) { return engine.MeasurementExists(name) } -// WriteTo writes the shard's data to w. -func (s *Shard) WriteTo(w io.Writer) (int64, error) { - engine, err := s.Engine() - if err != nil { - return 0, err - } - n, err := engine.WriteTo(w) - atomic.AddInt64(&s.stats.BytesWritten, int64(n)) - return n, err -} - // CreateIterator returns an iterator for the data in the shard. func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { engine, err := s.Engine() diff --git a/tsdb/store.go b/tsdb/store.go index 7df7e679f5..b2c2f687b5 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -25,6 +25,7 @@ import ( "github.com/influxdata/influxdb/v2/pkg/estimator/hll" "github.com/influxdata/influxdb/v2/pkg/limiter" "github.com/influxdata/influxql" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -41,12 +42,6 @@ var ( ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using multiple indexes. Please convert all shards to use the same index type to delete data") ) -// Statistics gathered by the store. -const ( - statDatabaseSeries = "numSeries" // number of series in a database - statDatabaseMeasurements = "numMeasurements" // number of measurements in a database -) - // SeriesFileDirectory is the name of the directory containing series files for // a database. const SeriesFileDirectory = "_series" @@ -126,16 +121,12 @@ func (s *Store) WithLogger(log *zap.Logger) { } } -// Statistics returns statistics for period monitoring. -func (s *Store) Statistics(tags map[string]string) []models.Statistic { - s.mu.RLock() - shards := s.shardsSlice() - s.mu.RUnlock() - - // Add all the series and measurements cardinality estimations. +// CollectBucketMetrics sets prometheus metrics for each bucket +func (s *Store) CollectBucketMetrics() { + // Collect all the bucket cardinality estimations databases := s.Databases() - statistics := make([]models.Statistic, 0, len(databases)) for _, database := range databases { + log := s.Logger.With(logger.Database(database)) sc, err := s.SeriesCardinality(context.Background(), database) if err != nil { @@ -149,21 +140,48 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { continue } - statistics = append(statistics, models.Statistic{ - Name: "database", - Tags: models.StatisticTags{"database": database}.Merge(tags), - Values: map[string]interface{}{ - statDatabaseSeries: sc, - statDatabaseMeasurements: mc, - }, - }) - } + labels := prometheus.Labels{bucketLabel: database} + seriesCardinality := globalBucketMetrics.seriesCardinality.With(labels) + measureCardinality := globalBucketMetrics.measureCardinality.With(labels) - // Gather all statistics for all shards. - for _, shard := range shards { - statistics = append(statistics, shard.Statistics(tags)...) + seriesCardinality.Set(float64(sc)) + measureCardinality.Set(float64(mc)) + } +} + +var globalBucketMetrics = newAllBucketMetrics() + +const bucketSubsystem = "bucket" +const bucketLabel = "bucket" + +type allBucketMetrics struct { + seriesCardinality *prometheus.GaugeVec + measureCardinality *prometheus.GaugeVec +} + +func newAllBucketMetrics() *allBucketMetrics { + labels := []string{bucketLabel} + return &allBucketMetrics{ + seriesCardinality: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: bucketSubsystem, + Name: "series_num", + Help: "Gauge of series cardinality per bucket", + }, labels), + measureCardinality: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNamespace, + Subsystem: bucketSubsystem, + Name: "measurement_num", + Help: "Gauge of measurement cardinality per bucket", + }, labels), + } +} + +func BucketCollectors() []prometheus.Collector { + return []prometheus.Collector{ + globalBucketMetrics.seriesCardinality, + globalBucketMetrics.measureCardinality, } - return statistics } func (s *Store) IndexBytes() int { @@ -229,6 +247,14 @@ func (s *Store) Open(ctx context.Context) error { }() } + if !s.EngineOptions.MetricsDisabled { + s.wg.Add(1) + go func() { + s.wg.Done() + s.collectMetrics() + }() + } + return nil } @@ -1499,13 +1525,6 @@ func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, dat return is.MeasurementNamesByExpr(auth, cond) } -// MeasurementSeriesCounts returns the number of measurements and series in all -// the shards' indices. -func (s *Store) MeasurementSeriesCounts(database string) (measuments int, series int) { - // TODO: implement me - return 0, 0 -} - type TagKeys struct { Measurement string Keys []string @@ -1944,6 +1963,19 @@ func (s *Store) monitorShards() { } } +func (s *Store) collectMetrics() { + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-s.closing: + return + case <-t.C: + s.CollectBucketMetrics() + } + } +} + // KeyValue holds a string key and a string value. type KeyValue struct { Key, Value string diff --git a/v1/services/retention/service.go b/v1/services/retention/service.go index a8de45bf8f..b263e3b7a9 100644 --- a/v1/services/retention/service.go +++ b/v1/services/retention/service.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/v1/services/meta" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -78,6 +79,32 @@ func (s *Service) WithLogger(log *zap.Logger) { s.logger = log.With(zap.String("service", "retention")) } +var globalRetentionMetrics = newRetentionMetrics() + +const storageNamespace = "storage" +const retentionSubsystem = "retention" + +type retentionMetrics struct { + checkDuration prometheus.Histogram +} + +func newRetentionMetrics() *retentionMetrics { + return &retentionMetrics{ + checkDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: storageNamespace, + Subsystem: retentionSubsystem, + Name: "check_duration", + Help: "Histogram of duration of retention check (in seconds)", + }), + } +} + +func PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + globalRetentionMetrics.checkDuration, + } +} + func (s *Service) run(ctx context.Context) { ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) defer ticker.Stop() @@ -87,6 +114,7 @@ func (s *Service) run(ctx context.Context) { return case <-ticker.C: + startTime := time.Now() log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check") type deletionInfo struct { @@ -164,6 +192,8 @@ func (s *Service) run(ctx context.Context) { } logEnd() + elapsed := time.Since(startTime) + globalRetentionMetrics.checkDuration.Observe(elapsed.Seconds()) } } }