feat(logging): Add startup logging for shard counts (#25378) (#25507)

* feat(logging): Add startup logging for shard counts (#25378)
This PR adds a check to see how many shards are remaining
vs how many shards are opened. This change displays the percent
completed too.

closes influxdata/feature-requests#476

(cherry picked from commit 3c87f52)

closes https://github.com/influxdata/influxdb/issues/25506

(cherry picked from commit 2ffb108a27)
db/cherry-pick-25507
WeblWabl 2024-11-01 09:20:35 -05:00 committed by Devan
parent f3d5325d78
commit 153a1fe944
4 changed files with 369 additions and 118 deletions

View File

@ -0,0 +1,32 @@
package run
import (
"fmt"
"sync/atomic"
"go.uber.org/zap"
)
type StartupProgressLogger struct {
shardsCompleted atomic.Uint64
shardsTotal atomic.Uint64
logger *zap.Logger
}
func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger {
return &StartupProgressLogger{
logger: logger,
}
}
func (s *StartupProgressLogger) AddShard() {
s.shardsTotal.Add(1)
}
func (s *StartupProgressLogger) CompletedShard() {
shardsCompleted := s.shardsCompleted.Add(1)
totalShards := s.shardsTotal.Load()
percentShards := float64(shardsCompleted) / float64(totalShards) * 100
s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards))
}

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/run"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/kit/platform"
errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors"
@ -167,6 +168,9 @@ func (e *Engine) WithLogger(log *zap.Logger) {
if e.precreatorService != nil {
e.precreatorService.WithLogger(log)
}
sl := run.NewStartupProgressLogger(e.logger)
e.tsdbStore.WithStartupMetrics(sl)
}
// PrometheusCollectors returns all the prometheus collectors associated with

View File

@ -50,6 +50,12 @@ const SeriesFileDirectory = "_series"
// databaseState keeps track of the state of a database.
type databaseState struct{ indexTypes map[string]int }
// struct to hold the result of opening each reader in a goroutine
type shardResponse struct {
s *Shard
err error
}
// addIndexType records that the database has a shard with the given index type.
func (d *databaseState) addIndexType(indexType string) {
if d.indexTypes == nil {
@ -118,6 +124,11 @@ type Store struct {
baseLogger *zap.Logger
Logger *zap.Logger
startupProgressMetrics interface {
AddShard()
CompletedShard()
}
closing chan struct{}
wg sync.WaitGroup
opened bool
@ -148,6 +159,13 @@ func (s *Store) WithLogger(log *zap.Logger) {
}
}
func (s *Store) WithStartupMetrics(sp interface {
AddShard()
CompletedShard()
}) {
s.startupProgressMetrics = sp
}
// CollectBucketMetrics sets prometheus metrics for each bucket
func (s *Store) CollectBucketMetrics() {
// Collect all the bucket cardinality estimations
@ -286,12 +304,6 @@ func (s *Store) Open(ctx context.Context) error {
}
func (s *Store) loadShards(ctx context.Context) error {
// res holds the result from opening each shard in a goroutine
type res struct {
s *Shard
err error
}
// Limit the number of concurrent TSM files to be opened to the number of cores.
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
@ -339,9 +351,8 @@ func (s *Store) loadShards(ctx context.Context) error {
log, logEnd := logger.NewOperation(context.TODO(), s.Logger, "Open store", "tsdb_open")
defer logEnd()
shardLoaderWg := new(sync.WaitGroup)
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
resC := make(chan *res)
var n int
// Determine how many shards we need to open by checking the store path.
dbDirs, err := os.ReadDir(s.path)
@ -349,119 +360,150 @@ func (s *Store) loadShards(ctx context.Context) error {
return err
}
for _, db := range dbDirs {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
continue
}
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
continue
}
// Load series file.
sfile, err := s.openSeriesFile(db.Name())
if err != nil {
return err
}
// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return err
}
for _, rp := range rpDirs {
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
if !rp.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
walkShardsAndProcess := func(fn func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error {
for _, db := range dbDirs {
rpDirs, err := s.getRetentionPolicyDirs(db, log)
if err != nil {
return err
} else if rpDirs == nil {
continue
}
// The .series directory is not a retention policy.
if rp.Name() == SeriesFileDirectory {
continue
}
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))
continue
}
shardDirs, err := os.ReadDir(rpPath)
// Load series file.
sfile, err := s.openSeriesFile(db.Name())
if err != nil {
return err
}
for _, sh := range shardDirs {
// Series file should not be in a retention policy but skip just in case.
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
for _, rp := range rpDirs {
shardDirs, err := s.getShards(rp, db, log)
if err != nil {
return err
} else if shardDirs == nil {
continue
}
n++
go func(db, rp, sh string) {
path := filepath.Join(s.path, db, rp, sh)
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
if err := t.Take(ctx); err != nil {
log.Error("failed to open shard at path", zap.String("path", path), zap.Error(err))
resC <- &res{err: fmt.Errorf("failed to open shard at path %q: %w", path, err)}
return
}
defer t.Release()
start := time.Now()
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh, 10, 64)
if err != nil {
log.Error("invalid shard ID found at path", zap.String("path", path))
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard", sh)}
return
for _, sh := range shardDirs {
// Series file should not be in a retention policy but skip just in case.
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
continue
}
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
log.Warn("skipping shard", zap.String("path", path), logger.Shard(shardID))
resC <- &res{}
return
if err := fn(sfile, sh, db, rp); err != nil {
return err
}
// Copy options and assign shared index.
opt := s.EngineOptions
// Provide an implementation of the ShardIDSets
opt.SeriesIDSets = shardSet{store: s, db: db}
// Open engine.
shard := NewShard(shardID, path, walPath, sfile, opt)
// Disable compactions, writes and queries until all shards are loaded
shard.EnableOnOpen = false
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger)
err = s.OpenShard(ctx, shard, false)
if err != nil {
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
return
}
resC <- &res{s: shard}
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
}(db.Name(), rp.Name(), sh.Name())
}
}
}
return nil
}
// We use `rawShardCount` as a buffer size for channel creation below.
// If there is no startupProgressMetrics count then this will be 0 creating a
// zero buffer channel.
rawShardCount := 0
if s.startupProgressMetrics != nil {
err := walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
rawShardCount++
s.startupProgressMetrics.AddShard()
return nil
})
if err != nil {
return err
}
}
// Gather results of opening shards concurrently, keeping track of how
// many databases we are managing.
for i := 0; i < n; i++ {
res := <-resC
shardResC := make(chan *shardResponse, rawShardCount)
err = walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
shardLoaderWg.Add(1)
go func(db, rp, sh string) {
defer shardLoaderWg.Done()
path := filepath.Join(s.path, db, rp, sh)
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
if err := t.Take(ctx); err != nil {
log.Error("failed to open shard at path", zap.String("path", path), zap.Error(err))
shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard at path %q: %w", path, err)}
return
}
defer t.Release()
start := time.Now()
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh, 10, 64)
if err != nil {
log.Info("invalid shard ID found at path", zap.String("path", path))
shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
shardResC <- &shardResponse{}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}
// Copy options and assign shared index.
opt := s.EngineOptions
// Provide an implementation of the ShardIDSets
opt.SeriesIDSets = shardSet{store: s, db: db}
// Open engine.
shard := NewShard(shardID, path, walPath, sfile, opt)
// Disable compactions, writes and queries until all shards are loaded
shard.EnableOnOpen = false
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger)
err = s.OpenShard(ctx, shard, false)
if err != nil {
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}
shardResC <- &shardResponse{s: shard}
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
}(db.Name(), rp.Name(), sh.Name())
return nil
})
if err != nil {
return err
}
if err := s.enableShards(shardLoaderWg, shardResC); err != nil {
return err
}
return nil
}
func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error {
go func() {
wg.Wait()
close(resC)
}()
for res := range resC {
if res.s == nil || res.err != nil {
continue
}
@ -472,7 +514,6 @@ func (s *Store) loadShards(ctx context.Context) error {
}
s.databases[res.s.database].addIndexType(res.s.IndexType())
}
close(resC)
// Check if any databases are running multiple index types.
for db, state := range s.databases {
@ -892,9 +933,7 @@ func (s *Store) DeleteDatabase(name string) error {
// no files locally, so nothing to do
return nil
}
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == name
})
shards := s.filterShards(byDatabase(name))
s.mu.RUnlock()
if err := s.walkShards(shards, func(sh *Shard) error {
@ -956,9 +995,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
// unknown database, nothing to do
return nil
}
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == database && sh.retentionPolicy == name
})
shards := s.filterShards(ComposeShardFilter(byDatabase(database), byRetentionPolicy(name)))
s.mu.RUnlock()
// Close and delete all shards under the retention policy on the
@ -1052,6 +1089,20 @@ func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard {
return shards
}
type ShardPredicate = func(sh *Shard) bool
func ComposeShardFilter(fns ...ShardPredicate) ShardPredicate {
return func(sh *Shard) bool {
for _, fn := range fns {
if !fn(sh) {
return false
}
}
return true
}
}
// byDatabase provides a predicate for filterShards that matches on the name of
// the database passed in.
func byDatabase(name string) func(sh *Shard) bool {
@ -1060,16 +1111,20 @@ func byDatabase(name string) func(sh *Shard) bool {
}
}
// byRetentionPolicy provides a predicate for filterShards that matches on the name of
// the retention policy passed in.
func byRetentionPolicy(name string) ShardPredicate {
return func(sh *Shard) bool {
return sh.retentionPolicy == name
}
}
// walkShards apply a function to each shard in parallel. fn must be safe for
// concurrent use. If any of the functions return an error, the first error is
// returned.
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
// struct to hold the result of opening each reader in a goroutine
type res struct {
err error
}
resC := make(chan res)
resC := make(chan shardResponse, len(shards))
var n int
for _, sh := range shards {
@ -1077,11 +1132,11 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
go func(sh *Shard) {
if err := fn(sh); err != nil {
resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
resC <- shardResponse{err: fmt.Errorf("shard %d: %s", sh.id, err)}
return
}
resC <- res{}
resC <- shardResponse{}
}(sh)
}
@ -2192,3 +2247,49 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
}
return nil
}
func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
return nil, nil
}
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
return nil, nil
}
// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return nil, err
}
return rpDirs, nil
}
func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name())
if !rpDir.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory"))
return nil, nil
}
// The .series directory is not a retention policy.
if rpDir.Name() == SeriesFileDirectory {
return nil, nil
}
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter"))
return nil, nil
}
shardDirs, err := os.ReadDir(rpPath)
if err != nil {
return nil, err
}
return shardDirs, nil
}

View File

@ -145,6 +145,96 @@ func TestStore_CreateShard(t *testing.T) {
}
}
// Ensure the store can create a new shard.
func TestStore_StartupShardProgress(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 1, true))
sh := s.Shard(1)
require.NotNil(t, sh)
// Create another shard and verify that it exists.
require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 2, true))
sh = s.Shard(2)
require.NotNil(t, sh)
msl := &mockStartupLogger{}
// Reopen shard and recheck.
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
// Equality check to make sure shards are always added prior to
// completion being called. This test opens 3 total shards - 1 shard
// fails, but we still want to track that it was attempted to be opened.
require.Equal(t, msl.shardTracker, []string{
"shard-add",
"shard-add",
"shard-complete",
"shard-complete",
})
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
}
}
// Introduces a test to ensure that shard loading still accounts for bad shards. We still want these to show up
// during the initial shard loading even though its in a error state.
func TestStore_BadShardLoading(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 1, true))
sh := s.Shard(1)
require.NotNil(t, sh)
// Create another shard and verify that it exists.
require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 2, true))
sh = s.Shard(2)
require.NotNil(t, sh)
// Create another shard and verify that it exists.
require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 3, true))
sh = s.Shard(3)
require.NotNil(t, sh)
s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred"))
err2 := s.OpenShard(context.Background(), s.Shard(sh.ID()), false)
require.Error(t, err2, "no error opening bad shard")
msl := &mockStartupLogger{}
// Reopen shard and recheck.
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
// Equality check to make sure shards are always added prior to
// completion being called. This test opens 3 total shards - 1 shard
// fails, but we still want to track that it was attempted to be opened.
require.Equal(t, msl.shardTracker, []string{
"shard-add",
"shard-add",
"shard-add",
"shard-complete",
"shard-complete",
"shard-complete",
})
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
}
}
func TestStore_BadShard(t *testing.T) {
const errStr = "a shard open error"
indexes := tsdb.RegisteredIndexes()
@ -2623,6 +2713,13 @@ func WithWALFlushOnShutdown(flush bool) StoreOption {
}
}
func WithStartupMetrics(sm *mockStartupLogger) StoreOption {
return func(s *Store) error {
s.WithStartupMetrics(sm)
return nil
}
}
// NewStore returns a new instance of Store with a temporary path.
func NewStore(tb testing.TB, index string, opts ...StoreOption) *Store {
tb.Helper()
@ -2767,3 +2864,20 @@ func dirExists(path string) bool {
}
return !os.IsNotExist(err)
}
type mockStartupLogger struct {
shardTracker []string
mu sync.Mutex
}
func (m *mockStartupLogger) AddShard() {
m.mu.Lock()
m.shardTracker = append(m.shardTracker, "shard-add")
m.mu.Unlock()
}
func (m *mockStartupLogger) CompletedShard() {
m.mu.Lock()
m.shardTracker = append(m.shardTracker, "shard-complete")
m.mu.Unlock()
}