feat: remaining storage metrics from OSS engine (#22938)

* fix: simplify disk size tracking

* refactor: EngineTags in tsdb package

* fix: fewer compaction buckets and dead code removal

* feat: shard metrics

* chore: formatting

* feat: tsdb store metrics

* feat: retention check metrics

* chore: fix go vet

* fix: review comments
pull/22962/head
Sam Arnold 2021-12-02 09:01:46 -05:00 committed by GitHub
parent 3460f1cc52
commit b970e359dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 416 additions and 244 deletions

View File

@ -441,7 +441,7 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i
}
} else {
log.Debug("Building cache from wal files")
cache := tsm1.NewCache(maxCacheSize, tsm1.EngineTags{}) // tags are for metrics only
cache := tsm1.NewCache(maxCacheSize, tsdb.EngineTags{}) // tags are for metrics only
loader := tsm1.NewCacheLoader(walPaths)
loader.WithLogger(log)
if err := loader.Load(cache); err != nil {

View File

@ -7,6 +7,7 @@ import (
"os"
"testing"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/require"
)
@ -110,7 +111,7 @@ func newTempWALValid(t *testing.T) string {
dir, err := os.MkdirTemp("", "verify-wal")
require.NoError(t, err)
w := tsm1.NewWAL(dir, 0, 0, tsm1.EngineTags{})
w := tsm1.NewWAL(dir, 0, 0, tsdb.EngineTags{})
defer w.Close()
require.NoError(t, w.Open())

View File

@ -324,6 +324,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
m.engine = storage.NewEngine(
opts.EnginePath,
opts.StorageConfig,
storage.WithMetricsDisabled(opts.MetricsDisabled),
storage.WithMetaClient(metaClient),
)
}

View File

@ -29,7 +29,6 @@ type TSDBStoreMock struct {
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ImportShardFn func(id uint64, r io.Reader) error
MeasurementSeriesCountsFn func(database string) (measurements int, series int)
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
@ -95,9 +94,6 @@ func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(ctx, auth, database, cond)
}
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measurements int, series int) {
return s.MeasurementSeriesCountsFn(database)
}
func (s *TSDBStoreMock) MeasurementsCardinality(database string) (int64, error) {
return s.MeasurementsCardinalityFn(database)
}

View File

@ -54,11 +54,10 @@ type Engine struct {
retentionService *retention.Service
precreatorService *precreator.Service
defaultMetricLabels prometheus.Labels
writePointsValidationEnabled bool
logger *zap.Logger
logger *zap.Logger
metricsDisabled bool
}
// Option provides a set
@ -70,6 +69,12 @@ func WithMetaClient(c MetaClient) Option {
}
}
func WithMetricsDisabled(m bool) Option {
return func(e *Engine) {
e.metricsDisabled = m
}
}
type MetaClient interface {
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
@ -109,11 +114,10 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
c.Data.WALDir = filepath.Join(path, "wal")
e := &Engine{
config: c,
path: path,
defaultMetricLabels: prometheus.Labels{},
tsdbStore: tsdb.NewStore(c.Data.Dir),
logger: zap.NewNop(),
config: c,
path: path,
tsdbStore: tsdb.NewStore(c.Data.Dir),
logger: zap.NewNop(),
writePointsValidationEnabled: true,
}
@ -127,6 +131,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
// Copy TSDB configuration.
e.tsdbStore.EngineOptions.EngineVersion = c.Data.Engine
e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index
e.tsdbStore.EngineOptions.MetricsDisabled = e.metricsDisabled
pw := coordinator.NewPointsWriter(c.WriteTimeout, path)
pw.TSDBStore = e.tsdbStore
@ -167,6 +172,9 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, tsm1.PrometheusCollectors()...)
metrics = append(metrics, coordinator.PrometheusCollectors()...)
metrics = append(metrics, tsdb.ShardCollectors()...)
metrics = append(metrics, tsdb.BucketCollectors()...)
metrics = append(metrics, retention.PrometheusCollectors()...)
return metrics
}

View File

@ -184,6 +184,7 @@ type EngineOptions struct {
OnNewEngine func(Engine)
FileStoreObserver FileStoreObserver
MetricsDisabled bool
}
// NewEngineOptions constructs an EngineOptions object with safe default values.

View File

@ -10,6 +10,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/stretchr/testify/assert"
)
@ -73,7 +74,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Run("cache", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})
const START, END = 10, 1
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
@ -96,7 +97,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Run("tsm", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})
const START, END = 10, 1
@ -133,7 +134,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
func TestFileStore_DuplicatePoints(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})
makeVals := func(ts ...int64) []Value {
vals := make([]Value, len(ts))
@ -218,7 +219,7 @@ func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64) []Value {
@ -320,7 +321,7 @@ func (a *FloatArray) Swap(i, j int) {
func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64, v float64) []Value {
@ -414,7 +415,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing
func TestFileStore_SeekBoundaries(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64, v float64) []Value {

View File

@ -190,7 +190,7 @@ type Cache struct {
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots.
// Note tags are for metrics only, so if metrics are not desired tags do not have to be set.
func NewCache(maxSize uint64, tags EngineTags) *Cache {
func NewCache(maxSize uint64, tags tsdb.EngineTags) *Cache {
c := &Cache{
maxSize: maxSize,
store: emptyStore{},
@ -224,7 +224,7 @@ type cacheMetrics struct {
}
func newAllCacheMetrics() *allCacheMetrics {
labels := EngineLabelNames()
labels := tsdb.EngineLabelNames()
return &allCacheMetrics{
MemBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
@ -276,7 +276,7 @@ func CacheCollectors() []prometheus.Collector {
}
}
func newCacheMetrics(tags EngineTags) *cacheMetrics {
func newCacheMetrics(tags tsdb.EngineTags) *cacheMetrics {
labels := tags.GetLabels()
return &cacheMetrics{
MemBytes: globalCacheMetrics.MemBytes.With(labels),

View File

@ -6,6 +6,7 @@ import (
"sync"
"testing"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
)
@ -26,7 +27,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
}
wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000, tsm1.EngineTags{})
c := tsm1.NewCache(1000000, tsdb.EngineTags{})
ch := make(chan struct{})
for _, s := range series {
@ -71,7 +72,7 @@ func TestCacheRace(t *testing.T) {
}
wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000, tsm1.EngineTags{})
c := tsm1.NewCache(1000000, tsdb.EngineTags{})
ch := make(chan struct{})
for _, s := range series {
@ -136,7 +137,7 @@ func TestCacheRace2Compacters(t *testing.T) {
}
wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000, tsm1.EngineTags{})
c := tsm1.NewCache(1000000, tsdb.EngineTags{})
ch := make(chan struct{})
for _, s := range series {

View File

@ -15,6 +15,7 @@ import (
"testing"
"github.com/golang/snappy"
"github.com/influxdata/influxdb/v2/tsdb"
)
// Convenience method for testing.
@ -23,7 +24,7 @@ func (c *Cache) Write(key []byte, values []Value) error {
}
func TestCache_NewCache(t *testing.T) {
c := NewCache(100, EngineTags{})
c := NewCache(100, tsdb.EngineTags{})
if c == nil {
t.Fatalf("failed to create new cache")
}
@ -46,7 +47,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(30*valuesSize, EngineTags{})
c := NewCache(30*valuesSize, tsdb.EngineTags{})
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -63,7 +64,7 @@ func TestCache_CacheWriteMulti(t *testing.T) {
// Tests that the cache stats and size are correctly maintained during writes.
func TestCache_WriteMulti_Stats(t *testing.T) {
limit := uint64(1)
c := NewCache(limit, EngineTags{})
c := NewCache(limit, tsdb.EngineTags{})
ms := NewTestStore()
c.store = ms
@ -75,7 +76,7 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
}
// Fail one of the values in the write.
c = NewCache(50, EngineTags{})
c = NewCache(50, tsdb.EngineTags{})
c.init()
c.store = ms
@ -104,7 +105,7 @@ func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(3*valuesSize, EngineTags{})
c := NewCache(3*valuesSize, tsdb.EngineTags{})
if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil {
t.Fatalf(" expected field type conflict")
@ -126,7 +127,7 @@ func TestCache_Cache_DeleteRange(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(30*valuesSize, EngineTags{})
c := NewCache(30*valuesSize, tsdb.EngineTags{})
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -165,7 +166,7 @@ func TestCache_DeleteRange_NoValues(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(3*valuesSize, EngineTags{})
c := NewCache(3*valuesSize, tsdb.EngineTags{})
if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -200,7 +201,7 @@ func TestCache_DeleteRange_NotSorted(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(3*valuesSize, EngineTags{})
c := NewCache(3*valuesSize, tsdb.EngineTags{})
if err := c.WriteMulti(map[string][]Value{"foo": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -235,7 +236,7 @@ func TestCache_Cache_Delete(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(30*valuesSize, EngineTags{})
c := NewCache(30*valuesSize, tsdb.EngineTags{})
if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -268,7 +269,7 @@ func TestCache_Cache_Delete(t *testing.T) {
}
func TestCache_Cache_Delete_NonExistent(t *testing.T) {
c := NewCache(1024, EngineTags{})
c := NewCache(1024, tsdb.EngineTags{})
c.Delete([][]byte{[]byte("bar")})
@ -289,7 +290,7 @@ func TestCache_CacheWriteMulti_Duplicates(t *testing.T) {
v5 := NewValue(5, 3.0)
values1 := Values{v3, v4, v5}
c := NewCache(0, EngineTags{})
c := NewCache(0, tsdb.EngineTags{})
if err := c.WriteMulti(map[string][]Value{"foo": values0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -319,7 +320,7 @@ func TestCache_CacheValues(t *testing.T) {
v3 := NewValue(1, 1.0)
v4 := NewValue(4, 4.0)
c := NewCache(512, EngineTags{})
c := NewCache(512, tsdb.EngineTags{})
if deduped := c.Values([]byte("no such key")); deduped != nil {
t.Fatalf("Values returned for no such key")
}
@ -347,7 +348,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
v6 := NewValue(7, 5.0)
v7 := NewValue(2, 5.0)
c := NewCache(512, EngineTags{})
c := NewCache(512, tsdb.EngineTags{})
if err := c.Write([]byte("foo"), Values{v0, v1, v2, v3}); err != nil {
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
}
@ -424,7 +425,7 @@ func TestCache_CacheSnapshot(t *testing.T) {
// Tests that Snapshot updates statistics correctly.
func TestCache_Snapshot_Stats(t *testing.T) {
limit := uint64(16)
c := NewCache(limit, EngineTags{})
c := NewCache(limit, tsdb.EngineTags{})
values := map[string][]Value{"foo": {NewValue(1, 1.0)}}
if err := c.WriteMulti(values); err != nil {
@ -443,7 +444,7 @@ func TestCache_Snapshot_Stats(t *testing.T) {
}
func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512, EngineTags{})
c := NewCache(512, tsdb.EngineTags{})
// Grab snapshot, and ensure it's as expected.
snapshot, err := c.Snapshot()
@ -470,7 +471,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
c := NewCache(uint64(v1.Size()), EngineTags{})
c := NewCache(uint64(v1.Size()), tsdb.EngineTags{})
if err := c.Write([]byte("foo"), Values{v0}); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -516,7 +517,7 @@ func TestCache_Deduplicate_Concurrent(t *testing.T) {
}
wg := sync.WaitGroup{}
c := NewCache(1000000, EngineTags{})
c := NewCache(1000000, tsdb.EngineTags{})
wg.Add(1)
go func() {
@ -568,7 +569,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}
// Load the cache using the segment.
cache := NewCache(1024, EngineTags{})
cache := NewCache(1024, tsdb.EngineTags{})
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -591,7 +592,7 @@ func TestCacheLoader_LoadSingle(t *testing.T) {
}
// Reload the cache using the segment.
cache = NewCache(1024, EngineTags{})
cache = NewCache(1024, tsdb.EngineTags{})
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -653,7 +654,7 @@ func TestCacheLoader_LoadDouble(t *testing.T) {
}
// Load the cache using the segments.
cache := NewCache(1024, EngineTags{})
cache := NewCache(1024, tsdb.EngineTags{})
loader := NewCacheLoader([]string{f1.Name(), f2.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -717,7 +718,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) {
}
// Load the cache using the segment.
cache := NewCache(1024, EngineTags{})
cache := NewCache(1024, tsdb.EngineTags{})
loader := NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -729,7 +730,7 @@ func TestCacheLoader_LoadDeleted(t *testing.T) {
}
// Reload the cache using the segment.
cache = NewCache(1024, EngineTags{})
cache = NewCache(1024, tsdb.EngineTags{})
loader = NewCacheLoader([]string{f.Name()})
if err := loader.Load(cache); err != nil {
t.Fatalf("failed to load cache: %s", err.Error())
@ -748,7 +749,7 @@ func TestCache_Split(t *testing.T) {
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(0, EngineTags{})
c := NewCache(0, tsdb.EngineTags{})
if err := c.Write([]byte("foo"), values); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -835,7 +836,7 @@ func (s *TestStore) count() int { return s.c
var fvSize = uint64(NewValue(1, float64(1)).Size())
func BenchmarkCacheFloatEntries(b *testing.B) {
cache := NewCache(uint64(b.N)*fvSize, EngineTags{})
cache := NewCache(uint64(b.N)*fvSize, tsdb.EngineTags{})
vals := make([][]Value, b.N)
for i := 0; i < b.N; i++ {
vals[i] = []Value{NewValue(1, float64(i))}
@ -856,7 +857,7 @@ type points struct {
func BenchmarkCacheParallelFloatEntries(b *testing.B) {
c := b.N * runtime.GOMAXPROCS(0)
cache := NewCache(uint64(c)*fvSize*10, EngineTags{})
cache := NewCache(uint64(c)*fvSize*10, tsdb.EngineTags{})
vals := make([]points, c)
for i := 0; i < c; i++ {
v := make([]Value, 10)

View File

@ -29,7 +29,7 @@ func TestCompactor_Snapshot(t *testing.T) {
"cpu,host=B#!~#value": {v2, v3},
}
c := tsm1.NewCache(0, tsm1.EngineTags{})
c := tsm1.NewCache(0, tsdb.EngineTags{})
for k, v := range points1 {
if err := c.Write([]byte(k), v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
@ -1386,7 +1386,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
"cpu,host=A#!~#value": {v0},
}
c := tsm1.NewCache(0, tsm1.EngineTags{})
c := tsm1.NewCache(0, tsdb.EngineTags{})
for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
@ -1434,7 +1434,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
"cpu,host=A#!~#value": {v0, v1},
}
c := tsm1.NewCache(0, tsm1.EngineTags{})
c := tsm1.NewCache(0, tsdb.EngineTags{})
for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {
@ -1484,7 +1484,7 @@ func TestCacheKeyIterator_Abort(t *testing.T) {
"cpu,host=A#!~#value": {v0},
}
c := tsm1.NewCache(0, tsm1.EngineTags{})
c := tsm1.NewCache(0, tsdb.EngineTags{})
for k, v := range writes {
if err := c.Write([]byte(k), v); err != nil {

View File

@ -158,12 +158,12 @@ type Engine struct {
// NewEngine returns a new instance of Engine.
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {
etags := EngineTags{
path: path,
walPath: walPath,
id: fmt.Sprintf("%d", id),
bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db
engineVersion: opt.EngineVersion,
etags := tsdb.EngineTags{
Path: path,
WalPath: walPath,
Id: fmt.Sprintf("%d", id),
Bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db
EngineVersion: opt.EngineVersion,
}
var wal *WAL
@ -590,7 +590,7 @@ func (e *Engine) LastModified() time.Time {
return fsTime
}
var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(EngineLabelNames())
var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(tsdb.EngineLabelNames())
// PrometheusCollectors returns all prometheus metrics for the tsm1 package.
func PrometheusCollectors() []prometheus.Collector {
@ -638,7 +638,8 @@ func newAllCompactionMetrics(labelNames []string) *compactionMetrics {
Subsystem: engineSubsystem,
Name: "duration_seconds",
Help: "Histogram of compactions by level since startup",
Buckets: []float64{0.1, 1, 10, 100, 1000, 10000, 100000},
// 10 minute compactions seem normal, 1h40min is high
Buckets: []float64{60, 600, 6000},
}, labelNamesWithLevel),
Active: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
@ -689,33 +690,7 @@ type compactionMetrics struct {
Failed *prometheus.CounterVec
}
// EngineTags holds tags for prometheus
//
// It should not be used for behaviour other than attaching tags to prometheus metrics
type EngineTags struct {
path, walPath, id, bucket, engineVersion string
}
func (et *EngineTags) GetLabels() prometheus.Labels {
return prometheus.Labels{
"path": et.path,
"walPath": et.walPath,
"id": et.id,
"bucket": et.bucket,
"engine": et.engineVersion,
}
}
func EngineLabelNames() []string {
emptyLabels := (&EngineTags{}).GetLabels()
val := make([]string, 0, len(emptyLabels))
for k := range emptyLabels {
val = append(val, k)
}
return val
}
func newEngineMetrics(tags EngineTags) *compactionMetrics {
func newEngineMetrics(tags tsdb.EngineTags) *compactionMetrics {
engineLabels := tags.GetLabels()
return &compactionMetrics{
Duration: globalCompactionMetrics.Duration.MustCurryWith(engineLabels),

View File

@ -226,7 +226,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
}
// NewFileStore returns a new instance of FileStore based on the given directory.
func NewFileStore(dir string, tags EngineTags) *FileStore {
func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
logger := zap.NewNop()
fs := &FileStore{
dir: dir,
@ -308,7 +308,7 @@ func (f *fileStoreMetrics) SetFiles(n int64) {
}
func newAllFileStoreMetrics() *allFileStoreMetrics {
labels := EngineLabelNames()
labels := tsdb.EngineLabelNames()
return &allFileStoreMetrics{
files: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
@ -332,7 +332,7 @@ func FileStoreCollectors() []prometheus.Collector {
}
}
func newFileStoreMetrics(tags EngineTags) *fileStoreMetrics {
func newFileStoreMetrics(tags tsdb.EngineTags) *fileStoreMetrics {
labels := tags.GetLabels()
return &fileStoreMetrics{
files: globalFileStoreMetrics.files.With(labels),

View File

@ -312,7 +312,7 @@ func TestFileStore_Array(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir, tsm1.EngineTags{})
fs := tsm1.NewFileStore(dir, tsdb.EngineTags{})
files, err := newFiles(dir, tc.data...)
if err != nil {

View File

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"go.uber.org/zap/zaptest"
)
@ -2749,7 +2750,7 @@ func TestFileStore_CreateSnapshot(t *testing.T) {
}
func newTestFileStore(dir string) *tsm1.FileStore {
return tsm1.NewFileStore(dir, tsm1.EngineTags{})
return tsm1.NewFileStore(dir, tsdb.EngineTags{})
}
type mockObserver struct {

View File

@ -21,6 +21,7 @@ import (
"github.com/golang/snappy"
"github.com/influxdata/influxdb/v2/pkg/limiter"
"github.com/influxdata/influxdb/v2/pkg/pool"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -116,7 +117,7 @@ type WAL struct {
}
// NewWAL initializes a new WAL at the given directory.
func NewWAL(path string, maxConcurrentWrites int, maxWriteDelay time.Duration, tags EngineTags) *WAL {
func NewWAL(path string, maxConcurrentWrites int, maxWriteDelay time.Duration, tags tsdb.EngineTags) *WAL {
logger := zap.NewNop()
if maxConcurrentWrites == 0 {
maxConcurrentWrites = defaultWaitingWALWrites
@ -184,7 +185,7 @@ func (f *walMetrics) SetSize(n int64) {
}
func newAllWALMetrics() *allWALMetrics {
labels := EngineLabelNames()
labels := tsdb.EngineLabelNames()
return &allWALMetrics{
size: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
@ -215,7 +216,7 @@ func WALCollectors() []prometheus.Collector {
}
}
func newWALMetrics(tags EngineTags) *walMetrics {
func newWALMetrics(tags tsdb.EngineTags) *walMetrics {
labels := tags.GetLabels()
return &walMetrics{
size: globalWALMetrics.size.With(labels),

View File

@ -14,13 +14,14 @@ import (
"github.com/golang/snappy"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/require"
)
func NewWAL(path string, maxConcurrentWrites int, maxWriteDelay time.Duration) *tsm1.WAL {
// EngineTags is only for metrics, not needed for tests
return tsm1.NewWAL(path, maxConcurrentWrites, maxWriteDelay, tsm1.EngineTags{})
return tsm1.NewWAL(path, maxConcurrentWrites, maxWriteDelay, tsdb.EngineTags{})
}
func TestWALWriter_WriteMulti_Single(t *testing.T) {

29
tsdb/enginetags.go Normal file
View File

@ -0,0 +1,29 @@
package tsdb
import "github.com/prometheus/client_golang/prometheus"
// EngineTags holds tags for prometheus
//
// It should not be used for behaviour other than attaching tags to prometheus metrics
type EngineTags struct {
Path, WalPath, Id, Bucket, EngineVersion string
}
func (et *EngineTags) GetLabels() prometheus.Labels {
return prometheus.Labels{
"path": et.Path,
"walPath": et.WalPath,
"id": et.Id,
"bucket": et.Bucket,
"engine": et.EngineVersion,
}
}
func EngineLabelNames() []string {
emptyLabels := (&EngineTags{}).GetLabels()
val := make([]string, 0, len(emptyLabels))
for k := range emptyLabels {
val = append(val, k)
}
return val
}

View File

@ -27,38 +27,21 @@ import (
"github.com/influxdata/influxdb/v2/pkg/slices"
internal "github.com/influxdata/influxdb/v2/tsdb/internal"
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
const (
statWriteReq = "writeReq"
statWriteReqOK = "writeReqOk"
statWriteReqErr = "writeReqErr"
statSeriesCreate = "seriesCreate"
statFieldsCreate = "fieldsCreate"
statWritePointsErr = "writePointsErr"
statWritePointsDropped = "writePointsDropped"
statWritePointsOK = "writePointsOk"
statWriteBytes = "writeBytes"
statDiskBytes = "diskBytes"
measurementKey = "_name"
measurementKey = "_name"
DefaultMetricInterval = 10 * time.Second
)
var (
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors.New("field overflow")
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors.New("field type conflict")
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors.New("field not found")
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors.New("field ID not mapped")
// ErrEngineClosed is returned when a caller attempts indirectly to
// access the shard's underlying engine.
ErrEngineClosed = errors.New("engine is closed")
@ -141,13 +124,13 @@ type Shard struct {
index Index
enabled bool
// expvar-based stats.
stats *ShardStatistics
defaultTags models.StatisticTags
stats *ShardMetrics
baseLogger *zap.Logger
logger *zap.Logger
metricUpdater *ticker
EnableOnOpen bool
// CompactionDisabled specifies the shard should not schedule compactions.
@ -160,29 +143,26 @@ func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt Eng
db, rp := decodeStorePath(path)
logger := zap.NewNop()
engineTags := EngineTags{
Path: path,
WalPath: walPath,
Id: fmt.Sprintf("%d", id),
Bucket: db,
EngineVersion: opt.EngineVersion,
}
s := &Shard{
id: id,
path: path,
walPath: walPath,
sfile: sfile,
options: opt,
stats: &ShardStatistics{},
defaultTags: models.StatisticTags{
"path": path,
"walPath": walPath,
"id": fmt.Sprintf("%d", id),
"database": db,
"retentionPolicy": rp,
"engine": opt.EngineVersion,
},
id: id,
path: path,
walPath: walPath,
sfile: sfile,
options: opt,
stats: newShardMetrics(engineTags),
database: db,
retentionPolicy: rp,
logger: logger,
baseLogger: logger,
EnableOnOpen: true,
logger: logger,
baseLogger: logger,
EnableOnOpen: true,
}
return s
}
@ -235,58 +215,140 @@ func (s *Shard) RetentionPolicy() string {
return s.retentionPolicy
}
// ShardStatistics maintains statistics for a shard.
type ShardStatistics struct {
WriteReq int64
WriteReqOK int64
WriteReqErr int64
FieldsCreated int64
WritePointsErr int64
WritePointsDropped int64
WritePointsOK int64
BytesWritten int64
DiskBytes int64
var globalShardMetrics = newAllShardMetrics()
type twoCounterObserver struct {
count prometheus.Counter
sum prometheus.Counter
}
// Statistics returns statistics for periodic monitoring.
func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
engine, err := s.Engine()
if err != nil {
return nil
func (t twoCounterObserver) Observe(f float64) {
t.sum.Inc()
t.count.Add(f)
}
var _ prometheus.Observer = twoCounterObserver{}
type allShardMetrics struct {
writes *prometheus.CounterVec
writesSum *prometheus.CounterVec
writesErr *prometheus.CounterVec
writesErrSum *prometheus.CounterVec
writesDropped *prometheus.CounterVec
fieldsCreated *prometheus.CounterVec
diskSize *prometheus.GaugeVec
series *prometheus.GaugeVec
}
type ShardMetrics struct {
writes prometheus.Observer
writesErr prometheus.Observer
writesDropped prometheus.Counter
fieldsCreated prometheus.Counter
diskSize prometheus.Gauge
series prometheus.Gauge
}
const storageNamespace = "storage"
const shardSubsystem = "shard"
func newAllShardMetrics() *allShardMetrics {
labels := EngineLabelNames()
return &allShardMetrics{
writes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "write_count",
Help: "Count of the number of write requests",
}, labels),
writesSum: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "write_sum",
Help: "Counter of the number of points for write requests",
}, labels),
writesErr: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "write_err_count",
Help: "Count of the number of write requests with errors",
}, labels),
writesErrSum: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "write_err_sum",
Help: "Counter of the number of points for write requests with errors",
}, labels),
writesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "write_dropped_sum",
Help: "Counter of the number of points droppped",
}, labels),
fieldsCreated: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "fields_created",
Help: "Counter of the number of fields created",
}, labels),
diskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "disk_size",
Help: "Gauge of the disk size for the shard",
}, labels),
series: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: shardSubsystem,
Name: "series",
Help: "Gauge of the number of series in the shard index",
}, labels),
}
}
// Refresh our disk size stat
if _, err := s.DiskSize(); err != nil {
return nil
func ShardCollectors() []prometheus.Collector {
return []prometheus.Collector{
globalShardMetrics.writes,
globalShardMetrics.writesSum,
globalShardMetrics.writesErr,
globalShardMetrics.writesErrSum,
globalShardMetrics.writesDropped,
globalShardMetrics.fieldsCreated,
globalShardMetrics.diskSize,
globalShardMetrics.series,
}
seriesN := engine.SeriesN()
}
tags = s.defaultTags.Merge(tags)
// Set the index type on the tags. N.B this needs to be checked since it's
// only set when the shard is opened.
if indexType := s.IndexType(); indexType != "" {
tags["indexType"] = indexType
}
statistics := []models.Statistic{{
Name: "shard",
Tags: tags,
Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
statWriteReqOK: atomic.LoadInt64(&s.stats.WriteReqOK),
statWriteReqErr: atomic.LoadInt64(&s.stats.WriteReqErr),
statSeriesCreate: seriesN,
statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated),
statWritePointsErr: atomic.LoadInt64(&s.stats.WritePointsErr),
statWritePointsDropped: atomic.LoadInt64(&s.stats.WritePointsDropped),
statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK),
statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten),
statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes),
func newShardMetrics(tags EngineTags) *ShardMetrics {
labels := tags.GetLabels()
return &ShardMetrics{
writes: twoCounterObserver{
count: globalShardMetrics.writes.With(labels),
sum: globalShardMetrics.writesSum.With(labels),
},
}}
writesErr: twoCounterObserver{
count: globalShardMetrics.writesErr.With(labels),
sum: globalShardMetrics.writesErrSum.With(labels),
},
writesDropped: globalShardMetrics.writesDropped.With(labels),
fieldsCreated: globalShardMetrics.fieldsCreated.With(labels),
diskSize: globalShardMetrics.diskSize.With(labels),
series: globalShardMetrics.series.With(labels),
}
}
return statistics
// ticker runs fn periodically, and stops when Stop() is called
//
// Stop waits for the last function run to finish if already running
type ticker struct {
wg sync.WaitGroup
closing chan struct{}
}
// Stops the ticker and waits for the function to complete
func (t *ticker) Stop() {
close(t.closing)
t.wg.Wait()
}
// Path returns the path set on the shard when it was created.
@ -353,6 +415,39 @@ func (s *Shard) Open(ctx context.Context) error {
}
s._engine = e
// Set up metric collection
metricUpdater := &ticker{
closing: make(chan struct{}),
}
// We want a way to turn off the series and disk size metrics if they are suspected to cause issues
// This corresponds to the top-level MetricsDisabled argument
if !s.options.MetricsDisabled {
metricUpdater.wg.Add(1)
go func() {
tick := time.NewTicker(DefaultMetricInterval)
defer metricUpdater.wg.Done()
defer tick.Stop()
for {
select {
case <-tick.C:
// Note this takes the engine lock, so we have to be careful not
// to close metricUpdater.closing while holding the engine lock
e, err := s.Engine()
if err != nil {
continue
}
s.stats.series.Set(float64(e.SeriesN()))
s.stats.diskSize.Set(float64(e.DiskSize()))
case <-metricUpdater.closing:
return
}
}
}()
}
s.metricUpdater = metricUpdater
return nil
}(); err != nil {
s.close()
@ -369,9 +464,14 @@ func (s *Shard) Open(ctx context.Context) error {
// Close shuts down the shard's store.
func (s *Shard) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.close()
err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.close()
}()
// make sure not to hold a lock while waiting for the metric updater to finish
s.metricUpdater.wg.Wait()
return err
}
// close closes the shard an removes reference to the shard from associated
@ -381,6 +481,10 @@ func (s *Shard) close() error {
return nil
}
if s.metricUpdater != nil {
close(s.metricUpdater.closing)
}
err := s._engine.Close()
if err == nil {
s._engine = nil
@ -488,7 +592,6 @@ func (s *Shard) DiskSize() (int64, error) {
return 0, ErrEngineClosed
}
size := s._engine.DiskSize()
atomic.StoreInt64(&s.stats.DiskBytes, size)
return size, nil
}
@ -499,7 +602,7 @@ type FieldCreate struct {
}
// WritePoints will write the raw data points and any new metadata to the index in the shard.
func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error {
func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr error) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -509,7 +612,12 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error {
}
var writeError error
atomic.AddInt64(&s.stats.WriteReq, 1)
s.stats.writes.Observe(float64(len(points)))
defer func() {
if rErr != nil {
s.stats.writesErr.Observe(float64(len(points)))
}
}()
points, fieldsToCreate, err := s.validateSeriesAndFields(points)
if err != nil {
@ -520,7 +628,7 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error {
// to the caller, but continue on writing the remaining points.
writeError = err
}
atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate)))
s.stats.fieldsCreated.Add(float64(len(fieldsToCreate)))
// add any new fields and keep track of what needs to be saved
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
@ -529,12 +637,8 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) error {
// Write to the engine.
if err := engine.WritePoints(ctx, points); err != nil {
atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))
atomic.AddInt64(&s.stats.WriteReqErr, 1)
return fmt.Errorf("engine: %s", err)
}
atomic.AddInt64(&s.stats.WritePointsOK, int64(len(points)))
atomic.AddInt64(&s.stats.WriteReqOK, 1)
return writeError
}
@ -604,7 +708,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
reason = err.Reason
dropped += err.Dropped
droppedKeys = err.DroppedKeys
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
s.stats.writesDropped.Add(float64(err.Dropped))
default:
return nil, nil, err
}
@ -648,7 +752,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
reason = err.Reason
}
dropped += err.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
s.stats.writesDropped.Add(float64(err.Dropped))
default:
return nil, nil, err
}
@ -813,17 +917,6 @@ func (s *Shard) MeasurementExists(name []byte) (bool, error) {
return engine.MeasurementExists(name)
}
// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
engine, err := s.Engine()
if err != nil {
return 0, err
}
n, err := engine.WriteTo(w)
atomic.AddInt64(&s.stats.BytesWritten, int64(n))
return n, err
}
// CreateIterator returns an iterator for the data in the shard.
func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
engine, err := s.Engine()

View File

@ -25,6 +25,7 @@ import (
"github.com/influxdata/influxdb/v2/pkg/estimator/hll"
"github.com/influxdata/influxdb/v2/pkg/limiter"
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@ -41,12 +42,6 @@ var (
ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using multiple indexes. Please convert all shards to use the same index type to delete data")
)
// Statistics gathered by the store.
const (
statDatabaseSeries = "numSeries" // number of series in a database
statDatabaseMeasurements = "numMeasurements" // number of measurements in a database
)
// SeriesFileDirectory is the name of the directory containing series files for
// a database.
const SeriesFileDirectory = "_series"
@ -126,16 +121,12 @@ func (s *Store) WithLogger(log *zap.Logger) {
}
}
// Statistics returns statistics for period monitoring.
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
s.mu.RLock()
shards := s.shardsSlice()
s.mu.RUnlock()
// Add all the series and measurements cardinality estimations.
// CollectBucketMetrics sets prometheus metrics for each bucket
func (s *Store) CollectBucketMetrics() {
// Collect all the bucket cardinality estimations
databases := s.Databases()
statistics := make([]models.Statistic, 0, len(databases))
for _, database := range databases {
log := s.Logger.With(logger.Database(database))
sc, err := s.SeriesCardinality(context.Background(), database)
if err != nil {
@ -149,21 +140,48 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
continue
}
statistics = append(statistics, models.Statistic{
Name: "database",
Tags: models.StatisticTags{"database": database}.Merge(tags),
Values: map[string]interface{}{
statDatabaseSeries: sc,
statDatabaseMeasurements: mc,
},
})
}
labels := prometheus.Labels{bucketLabel: database}
seriesCardinality := globalBucketMetrics.seriesCardinality.With(labels)
measureCardinality := globalBucketMetrics.measureCardinality.With(labels)
// Gather all statistics for all shards.
for _, shard := range shards {
statistics = append(statistics, shard.Statistics(tags)...)
seriesCardinality.Set(float64(sc))
measureCardinality.Set(float64(mc))
}
}
var globalBucketMetrics = newAllBucketMetrics()
const bucketSubsystem = "bucket"
const bucketLabel = "bucket"
type allBucketMetrics struct {
seriesCardinality *prometheus.GaugeVec
measureCardinality *prometheus.GaugeVec
}
func newAllBucketMetrics() *allBucketMetrics {
labels := []string{bucketLabel}
return &allBucketMetrics{
seriesCardinality: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: bucketSubsystem,
Name: "series_num",
Help: "Gauge of series cardinality per bucket",
}, labels),
measureCardinality: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: bucketSubsystem,
Name: "measurement_num",
Help: "Gauge of measurement cardinality per bucket",
}, labels),
}
}
func BucketCollectors() []prometheus.Collector {
return []prometheus.Collector{
globalBucketMetrics.seriesCardinality,
globalBucketMetrics.measureCardinality,
}
return statistics
}
func (s *Store) IndexBytes() int {
@ -229,6 +247,14 @@ func (s *Store) Open(ctx context.Context) error {
}()
}
if !s.EngineOptions.MetricsDisabled {
s.wg.Add(1)
go func() {
s.wg.Done()
s.collectMetrics()
}()
}
return nil
}
@ -1499,13 +1525,6 @@ func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, dat
return is.MeasurementNamesByExpr(auth, cond)
}
// MeasurementSeriesCounts returns the number of measurements and series in all
// the shards' indices.
func (s *Store) MeasurementSeriesCounts(database string) (measuments int, series int) {
// TODO: implement me
return 0, 0
}
type TagKeys struct {
Measurement string
Keys []string
@ -1944,6 +1963,19 @@ func (s *Store) monitorShards() {
}
}
func (s *Store) collectMetrics() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-s.closing:
return
case <-t.C:
s.CollectBucketMetrics()
}
}
}
// KeyValue holds a string key and a string value.
type KeyValue struct {
Key, Value string

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -78,6 +79,32 @@ func (s *Service) WithLogger(log *zap.Logger) {
s.logger = log.With(zap.String("service", "retention"))
}
var globalRetentionMetrics = newRetentionMetrics()
const storageNamespace = "storage"
const retentionSubsystem = "retention"
type retentionMetrics struct {
checkDuration prometheus.Histogram
}
func newRetentionMetrics() *retentionMetrics {
return &retentionMetrics{
checkDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: storageNamespace,
Subsystem: retentionSubsystem,
Name: "check_duration",
Help: "Histogram of duration of retention check (in seconds)",
}),
}
}
func PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
globalRetentionMetrics.checkDuration,
}
}
func (s *Service) run(ctx context.Context) {
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
defer ticker.Stop()
@ -87,6 +114,7 @@ func (s *Service) run(ctx context.Context) {
return
case <-ticker.C:
startTime := time.Now()
log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check")
type deletionInfo struct {
@ -164,6 +192,8 @@ func (s *Service) run(ctx context.Context) {
}
logEnd()
elapsed := time.Since(startTime)
globalRetentionMetrics.checkDuration.Observe(elapsed.Seconds())
}
}
}