chore: loadShards changes to more cleanly support 2.x feature (#25528)
* chore: loadShards changes to more cleanly support 2.x feature (#25513) * chore: move shardID parsing and shard filtering into walkShardsAndProcess * chore: make it impossible to miss sending shardResponse or marking shard as complete * chore: always count number of shards (preparation for 2.x related feature) * chore: explicitly load series files and create indices serially Explicitly load series files and create indices serially. Also avoid passing them to work functions that don't need them. * chore: rework loadShards for changes necessary to cancel loading process * chore: comment improvements * fix: fix race conditions in TestStore_StartupShardProgress and TestStore_BadShardLoading * chore: avoid logging nil error * chore: refactor shard loading and shard walking Refactor loadShards and CreateShard to use a common shardLoader class that makes thread-safety easier. Refactor walkShardsAndProcess into findShards. * chore: improve comment * chore: rename OpenShard to ReopenShard and implement with shardLoader Rename Store.OpenShard to Store.ReopenShard and implement using a shardLoader object. Changes to tests as necessary. * chore: avoid resetting shard options and locking on Reopen Avoid resetting shard options when reopening a shard. Proper mutex locker in Shard.ReopenShard. * chore: fix formatting issue * chore: warn on mixed index types in Store.CreateShard * chore: change from info to warn when invalid shard IDs found in path * chore: use coarser locking in Store.ReopenShard * chore: fix typo in comment * chore: code simplification (cherry picked from commitpull/25540/head0bc167bbd7
) * chore: fix logging issues in Store.loadShards Fix reporting shards not opening correctly when they actually did. Fix race condition with logging in loadShards. (cherry picked from commit65683bf166
) * chore: remove unnecessary fmt.Sprintf calls Remove unnecessary fmt.Sprintf calls for static code checks in main-2.x. (cherry picked from commit8497fbf0af
) * chore: remove unnecessary blank identifier * chore: remove unnecessary blank identifier
parent
2ffb108a27
commit
5c7479eb14
497
tsdb/store.go
497
tsdb/store.go
|
@ -88,7 +88,12 @@ func (se *shardErrorMap) setShardOpenError(shardID uint64, err error) {
|
|||
if err == nil {
|
||||
delete(se.shardErrors, shardID)
|
||||
} else {
|
||||
se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)}
|
||||
// Ignore incoming error if it is from a previous open failure. We don't want to keep
|
||||
// re-wrapping the same error. For safety, make sure we have an ErrPreviousShardFail in
|
||||
// case we hadn't recorded it.
|
||||
if !errors.Is(err, ErrPreviousShardFail{}) || !errors.Is(se.shardErrors[shardID], ErrPreviousShardFail{}) {
|
||||
se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -303,6 +308,142 @@ func (s *Store) Open(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// generateTrailingPath returns the last part of a shard path or WAL path
|
||||
// based on the shardID, db, and rp.
|
||||
func (s *Store) generateTrailingPath(shardID uint64, db, rp string) string {
|
||||
return filepath.Join(db, rp, strconv.FormatUint(shardID, 10))
|
||||
}
|
||||
|
||||
// generatePath returns the path to a shard based on its db, rp, and shardID.
|
||||
func (s *Store) generatePath(shardID uint64, db, rp string) string {
|
||||
return filepath.Join(s.path, s.generateTrailingPath(shardID, db, rp))
|
||||
}
|
||||
|
||||
// generateWALPath returns the WAL path to a shard based on its db, rp, and shardID.
|
||||
func (s *Store) generateWALPath(shardID uint64, db, rp string) string {
|
||||
return filepath.Join(s.EngineOptions.Config.WALDir, s.generateTrailingPath(shardID, db, rp))
|
||||
}
|
||||
|
||||
// shardLoader is an independent object that can load shards from disk in a thread-safe manner.
|
||||
// It should be created with Store.newShardLoader. The result of shardLoader.Load should then
|
||||
// be registered with Store.registerShard.
|
||||
type shardLoader struct {
|
||||
// NOTE: shardLoader should not directly reference the Store that creates it or any of its fields.
|
||||
|
||||
shardID uint64
|
||||
db string
|
||||
rp string
|
||||
sfile *SeriesFile
|
||||
engineOpts EngineOptions
|
||||
enabled bool
|
||||
logger *zap.Logger
|
||||
|
||||
// Shard we are working with. Could be created by the loader or given by client code.
|
||||
shard *Shard
|
||||
|
||||
// Should be loaded even if loading failed previously?
|
||||
force bool
|
||||
|
||||
// Path to shard on disk
|
||||
path string
|
||||
|
||||
// Path to WAL on disk
|
||||
walPath string
|
||||
|
||||
// loadErr indicates if Load should fail immediately with an error.
|
||||
loadErr error
|
||||
}
|
||||
|
||||
// Load loads a shard from disk in a thread-safe manner. After a call to Load,
|
||||
// the result must be registered with Store.registerShard, whether or not an error
|
||||
// occurred. The returned shard is guaranteed to not be nil and have the correct shard ID,
|
||||
// although it will not be properly loaded if there was an error.
|
||||
func (l *shardLoader) Load(ctx context.Context) *shardResponse {
|
||||
// Open engine.
|
||||
if l.shard == nil {
|
||||
l.shard = NewShard(l.shardID, l.path, l.walPath, l.sfile, l.engineOpts)
|
||||
|
||||
// Set options based on caller preferences.
|
||||
l.shard.EnableOnOpen = l.enabled
|
||||
l.shard.CompactionDisabled = l.engineOpts.CompactionDisabled
|
||||
l.shard.WithLogger(l.logger)
|
||||
}
|
||||
|
||||
err := func() error {
|
||||
// Stop and return error if previous open failed.
|
||||
if l.loadErr != nil {
|
||||
return l.loadErr
|
||||
}
|
||||
|
||||
// Open the shard.
|
||||
return l.shard.Open(ctx)
|
||||
}()
|
||||
|
||||
return &shardResponse{s: l.shard, err: err}
|
||||
}
|
||||
|
||||
type shardLoaderOption func(*shardLoader)
|
||||
|
||||
// withForceLoad allows forcing shard opens even if a previous load failed with an error.
|
||||
func withForceLoad(force bool) shardLoaderOption {
|
||||
return func(l *shardLoader) {
|
||||
l.force = force
|
||||
}
|
||||
}
|
||||
|
||||
// withExistingShard uses an existing Shard already registered with Store instead
|
||||
// of creating a new one.
|
||||
func withExistingShard(shard *Shard) shardLoaderOption {
|
||||
return func(l *shardLoader) {
|
||||
l.shard = shard
|
||||
}
|
||||
}
|
||||
|
||||
// newShardLoader generates a shardLoader that can be used to load a shard in a
|
||||
// thread-safe manner. The result of the shardLoader.Load() must then be
|
||||
// populated into s using Store.registerShard.
|
||||
// s.mu must be held before calling newShardLoader. newShardLoader is not thread-safe.
|
||||
// Note that any errors detected during newShardLoader will not be returned to caller until
|
||||
// Load is called. This is to simplify error handling for client code.
|
||||
func (s *Store) newShardLoader(shardID uint64, db, rp string, enabled bool, opts ...shardLoaderOption) *shardLoader {
|
||||
l := &shardLoader{
|
||||
shardID: shardID,
|
||||
db: db,
|
||||
rp: rp,
|
||||
engineOpts: s.EngineOptions,
|
||||
enabled: enabled,
|
||||
logger: s.baseLogger,
|
||||
|
||||
path: s.generatePath(shardID, db, rp),
|
||||
walPath: s.generateWALPath(shardID, db, rp),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(l)
|
||||
}
|
||||
|
||||
// Check for error from last load attempt.
|
||||
lastErr, _ := s.badShards.shardError(shardID)
|
||||
if lastErr != nil && !l.force {
|
||||
l.loadErr = fmt.Errorf("not attempting to open shard %d; %w", shardID, lastErr)
|
||||
return l
|
||||
}
|
||||
|
||||
// Provide an implementation of the ShardIDSets
|
||||
l.engineOpts.SeriesIDSets = shardSet{store: s, db: db}
|
||||
|
||||
// Retrieve cached series file or load it if not cached in s.
|
||||
sfile, err := s.openSeriesFile(db)
|
||||
if err != nil {
|
||||
l.loadErr = fmt.Errorf("error loading series file for database %q: %w", db, err)
|
||||
return l
|
||||
}
|
||||
l.sfile = sfile
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// loadShards loads all shards on disk. s.mu must be held before calling loadShards.
|
||||
func (s *Store) loadShards(ctx context.Context) error {
|
||||
// Limit the number of concurrent TSM files to be opened to the number of cores.
|
||||
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
|
@ -351,180 +492,58 @@ func (s *Store) loadShards(ctx context.Context) error {
|
|||
log, logEnd := logger.NewOperation(context.TODO(), s.Logger, "Open store", "tsdb_open")
|
||||
defer logEnd()
|
||||
|
||||
shardLoaderWg := new(sync.WaitGroup)
|
||||
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
|
||||
// Determine how many shards we need to open by checking the store path.
|
||||
dbDirs, err := os.ReadDir(s.path)
|
||||
// Get list of shards and their db / rp.
|
||||
shards, err := s.findShards(log)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("error while finding shards to load: %w", err)
|
||||
}
|
||||
|
||||
walkShardsAndProcess := func(fn func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error {
|
||||
for _, db := range dbDirs {
|
||||
rpDirs, err := s.getRetentionPolicyDirs(db, log)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if rpDirs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Load series file.
|
||||
sfile, err := s.openSeriesFile(db.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, rp := range rpDirs {
|
||||
shardDirs, err := s.getShards(rp, db, log)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if shardDirs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sh := range shardDirs {
|
||||
// Series file should not be in a retention policy but skip just in case.
|
||||
if sh.Name() == SeriesFileDirectory {
|
||||
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := fn(sfile, sh, db, rp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// We use `rawShardCount` as a buffer size for channel creation below.
|
||||
// If there is no startupProgressMetrics count then this will be 0 creating a
|
||||
// zero buffer channel.
|
||||
rawShardCount := 0
|
||||
// Setup progress metrics.
|
||||
if s.startupProgressMetrics != nil {
|
||||
err := walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
|
||||
rawShardCount++
|
||||
for range shards {
|
||||
s.startupProgressMetrics.AddShard()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
shardResC := make(chan *shardResponse, rawShardCount)
|
||||
err = walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
|
||||
shardLoaderWg.Add(1)
|
||||
// Do the actual work of loading shards.
|
||||
shardResC := make(chan *shardResponse, len(shards))
|
||||
pendingShardCount := 0
|
||||
for _, sh := range shards {
|
||||
pendingShardCount++
|
||||
|
||||
go func(db, rp, sh string) {
|
||||
defer shardLoaderWg.Done()
|
||||
path := filepath.Join(s.path, db, rp, sh)
|
||||
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
|
||||
// loader must be created serially for thread-safety, then they can be used in parallel manner.
|
||||
loader := s.newShardLoader(sh.id, sh.db, sh.rp, false)
|
||||
|
||||
if err := t.Take(ctx); err != nil {
|
||||
log.Error("failed to open shard at path", zap.String("path", path), zap.Error(err))
|
||||
shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard at path %q: %w", path, err)}
|
||||
return
|
||||
}
|
||||
// Now perform the actual loading in parallel in separate goroutines.
|
||||
go func(log *zap.Logger) {
|
||||
t.Take(ctx)
|
||||
defer t.Release()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Shard file names are numeric shardIDs
|
||||
shardID, err := strconv.ParseUint(sh, 10, 64)
|
||||
if err != nil {
|
||||
log.Info("invalid shard ID found at path", zap.String("path", path))
|
||||
shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
return
|
||||
res := loader.Load(ctx)
|
||||
if res.err == nil {
|
||||
log.Info("Opened shard", zap.String("index_version", res.s.IndexType()), zap.Duration("duration", time.Since(start)))
|
||||
} else {
|
||||
log.Error("Failed to open shard", zap.Error(res.err))
|
||||
}
|
||||
|
||||
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
|
||||
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
|
||||
shardResC <- &shardResponse{}
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Copy options and assign shared index.
|
||||
opt := s.EngineOptions
|
||||
|
||||
// Provide an implementation of the ShardIDSets
|
||||
opt.SeriesIDSets = shardSet{store: s, db: db}
|
||||
|
||||
// Open engine.
|
||||
shard := NewShard(shardID, path, walPath, sfile, opt)
|
||||
|
||||
// Disable compactions, writes and queries until all shards are loaded
|
||||
shard.EnableOnOpen = false
|
||||
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
|
||||
shard.WithLogger(s.baseLogger)
|
||||
|
||||
err = s.OpenShard(ctx, shard, false)
|
||||
if err != nil {
|
||||
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
||||
shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
shardResC <- &shardResponse{s: shard}
|
||||
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
|
||||
shardResC <- res
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
}(db.Name(), rp.Name(), sh.Name())
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}(log.With(logger.Shard(sh.id), zap.String("path", loader.path)))
|
||||
}
|
||||
|
||||
if err := s.enableShards(shardLoaderWg, shardResC); err != nil {
|
||||
return err
|
||||
// Register shards serially as the parallel goroutines finish opening them.
|
||||
for finishedShardCount := 0; finishedShardCount < pendingShardCount; finishedShardCount++ {
|
||||
res := <-shardResC
|
||||
s.registerShard(res)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error {
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(resC)
|
||||
}()
|
||||
|
||||
for res := range resC {
|
||||
if res.s == nil || res.err != nil {
|
||||
continue
|
||||
}
|
||||
s.shards[res.s.id] = res.s
|
||||
s.epochs[res.s.id] = newEpochTracker()
|
||||
if _, ok := s.databases[res.s.database]; !ok {
|
||||
s.databases[res.s.database] = new(databaseState)
|
||||
}
|
||||
s.databases[res.s.database].addIndexType(res.s.IndexType())
|
||||
}
|
||||
|
||||
// Check if any databases are running multiple index types.
|
||||
for db, state := range s.databases {
|
||||
if state.hasMultipleIndexTypes() {
|
||||
var fields []zapcore.Field
|
||||
for idx, cnt := range state.indexTypes {
|
||||
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
|
||||
}
|
||||
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)
|
||||
}
|
||||
}
|
||||
// Check and log if any databases are running multiple index types.
|
||||
s.warnMixedIndexTypes()
|
||||
|
||||
// Enable all shards
|
||||
for _, sh := range s.shards {
|
||||
|
@ -539,6 +558,59 @@ func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error
|
|||
return nil
|
||||
}
|
||||
|
||||
// registerShard registers a shardResponse from a shardLoader.Load operation with s.
|
||||
// registerShard should always be called with the result of shardLoader.Load, even if
|
||||
// the shard loading failed. This makes sure errors opening shards are properly tracked.
|
||||
// s.mu should be held before calling registerShard. registerShard is not thread-safe and
|
||||
// and should not be used in a paralell manner.
|
||||
func (s *Store) registerShard(res *shardResponse) {
|
||||
if res.s == nil {
|
||||
s.Logger.Error("registerShard called with nil")
|
||||
return
|
||||
}
|
||||
if res.err != nil {
|
||||
s.badShards.setShardOpenError(res.s.ID(), res.err)
|
||||
return
|
||||
}
|
||||
|
||||
// Avoid registering an already registered shard.
|
||||
if s.shards[res.s.id] != res.s {
|
||||
s.shards[res.s.id] = res.s
|
||||
s.epochs[res.s.id] = newEpochTracker()
|
||||
if _, ok := s.databases[res.s.database]; !ok {
|
||||
s.databases[res.s.database] = new(databaseState)
|
||||
}
|
||||
s.databases[res.s.database].addIndexType(res.s.IndexType())
|
||||
}
|
||||
}
|
||||
|
||||
// warnMixedIndexTypes checks the databases listed in dbList for mixed
|
||||
// index types and logs warnings if any are found. If no dbList is given, then
|
||||
// all databases in s are checked.
|
||||
func (s *Store) warnMixedIndexTypes(dbList ...string) {
|
||||
var dbStates map[string]*databaseState
|
||||
if len(dbList) == 0 {
|
||||
dbStates = s.databases
|
||||
} else {
|
||||
dbStates = make(map[string]*databaseState)
|
||||
for _, db := range dbList {
|
||||
if state, ok := s.databases[db]; ok {
|
||||
dbStates[db] = state
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
for db, state := range dbStates {
|
||||
if state.hasMultipleIndexTypes() {
|
||||
var fields []zapcore.Field
|
||||
for idx, cnt := range state.indexTypes {
|
||||
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
|
||||
}
|
||||
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the store and all associated shards. After calling Close accessing
|
||||
// shards through the Store will result in ErrStoreClosed being returned.
|
||||
func (s *Store) Close() error {
|
||||
|
@ -646,18 +718,20 @@ func (e ErrPreviousShardFail) Error() string {
|
|||
return e.error.Error()
|
||||
}
|
||||
|
||||
func (s *Store) OpenShard(ctx context.Context, sh *Shard, force bool) error {
|
||||
func (s *Store) ReopenShard(ctx context.Context, shardID uint64, force bool) error {
|
||||
sh := s.Shard(shardID)
|
||||
if sh == nil {
|
||||
return errors.New("cannot open nil shard")
|
||||
}
|
||||
oldErr, bad := s.badShards.shardError(sh.ID())
|
||||
if force || !bad {
|
||||
err := sh.Open(ctx)
|
||||
s.badShards.setShardOpenError(sh.ID(), err)
|
||||
return err
|
||||
} else {
|
||||
return fmt.Errorf("not attempting to open shard %d; %w", sh.ID(), oldErr)
|
||||
return ErrShardNotFound
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
loader := s.newShardLoader(shardID, "", "", true, withExistingShard(sh), withForceLoad(force))
|
||||
res := loader.Load(ctx)
|
||||
s.registerShard(res)
|
||||
|
||||
return res.err
|
||||
}
|
||||
|
||||
func (s *Store) SetShardOpenErrorForTest(shardID uint64, err error) {
|
||||
|
@ -735,40 +809,11 @@ func (s *Store) CreateShard(ctx context.Context, database, retentionPolicy strin
|
|||
return err
|
||||
}
|
||||
|
||||
// Retrieve database series file.
|
||||
sfile, err := s.openSeriesFile(database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy index options and pass in shared index.
|
||||
opt := s.EngineOptions
|
||||
opt.SeriesIDSets = shardSet{store: s, db: database}
|
||||
|
||||
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
||||
shard := NewShard(shardID, path, walPath, sfile, opt)
|
||||
shard.WithLogger(s.baseLogger)
|
||||
shard.EnableOnOpen = enabled
|
||||
|
||||
if err := s.OpenShard(ctx, shard, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.shards[shardID] = shard
|
||||
s.epochs[shardID] = newEpochTracker()
|
||||
if _, ok := s.databases[database]; !ok {
|
||||
s.databases[database] = new(databaseState)
|
||||
}
|
||||
s.databases[database].addIndexType(shard.IndexType())
|
||||
if state := s.databases[database]; state.hasMultipleIndexTypes() {
|
||||
var fields []zapcore.Field
|
||||
for idx, cnt := range state.indexTypes {
|
||||
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
|
||||
}
|
||||
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(database))...)
|
||||
}
|
||||
|
||||
return nil
|
||||
loader := s.newShardLoader(shardID, database, retentionPolicy, enabled)
|
||||
res := loader.Load(ctx)
|
||||
s.registerShard(res)
|
||||
s.warnMixedIndexTypes(database)
|
||||
return res.err
|
||||
}
|
||||
|
||||
// CreateShardSnapShot will create a hard link to the underlying shard and return a path.
|
||||
|
@ -2248,6 +2293,68 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type shardInfo struct {
|
||||
id uint64
|
||||
db string
|
||||
rp string
|
||||
}
|
||||
|
||||
// findShards returns a list of all shards and their db / rp that are found
|
||||
// in s.path.
|
||||
func (s *Store) findShards(log *zap.Logger) ([]shardInfo, error) {
|
||||
var shards []shardInfo
|
||||
|
||||
// Determine how many shards we need to open by checking the store path.
|
||||
dbDirs, err := os.ReadDir(s.path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, db := range dbDirs {
|
||||
rpDirs, err := s.getRetentionPolicyDirs(db, log)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if rpDirs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, rp := range rpDirs {
|
||||
shardDirs, err := s.getShards(rp, db, log)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if shardDirs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sh := range shardDirs {
|
||||
fullPath := filepath.Join(s.path, db.Name(), rp.Name())
|
||||
|
||||
// Series file should not be in a retention policy but skip just in case.
|
||||
if sh.Name() == SeriesFileDirectory {
|
||||
log.Warn("Skipping series file in retention policy dir", zap.String("path", fullPath))
|
||||
continue
|
||||
}
|
||||
|
||||
// Shard file names are numeric shardIDs
|
||||
shardID, err := strconv.ParseUint(sh.Name(), 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("invalid shard ID found at path", zap.String("path", fullPath))
|
||||
continue
|
||||
}
|
||||
|
||||
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db.Name(), rp.Name(), shardID) {
|
||||
log.Info("skipping shard", zap.String("path", fullPath), logger.Shard(shardID))
|
||||
continue
|
||||
}
|
||||
|
||||
shards = append(shards, shardInfo{id: shardID, db: db.Name(), rp: rp.Name()})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return shards, nil
|
||||
}
|
||||
|
||||
func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
|
||||
dbPath := filepath.Join(s.path, db.Name())
|
||||
if !db.IsDir() {
|
||||
|
|
|
@ -19,14 +19,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/predicate"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||
"github.com/influxdata/influxdb/v2/internal"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/pkg/deep"
|
||||
"github.com/influxdata/influxdb/v2/pkg/slices"
|
||||
"github.com/influxdata/influxdb/v2/predicate"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -171,7 +170,7 @@ func TestStore_StartupShardProgress(t *testing.T) {
|
|||
// Equality check to make sure shards are always added prior to
|
||||
// completion being called. This test opens 3 total shards - 1 shard
|
||||
// fails, but we still want to track that it was attempted to be opened.
|
||||
require.Equal(t, msl.shardTracker, []string{
|
||||
require.Equal(t, msl.Tracked(), []string{
|
||||
"shard-add",
|
||||
"shard-add",
|
||||
"shard-complete",
|
||||
|
@ -209,7 +208,7 @@ func TestStore_BadShardLoading(t *testing.T) {
|
|||
require.NotNil(t, sh)
|
||||
|
||||
s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred"))
|
||||
err2 := s.OpenShard(context.Background(), s.Shard(sh.ID()), false)
|
||||
err2 := s.ReopenShard(context.Background(), sh.ID(), false)
|
||||
require.Error(t, err2, "no error opening bad shard")
|
||||
|
||||
msl := &mockStartupLogger{}
|
||||
|
@ -220,7 +219,7 @@ func TestStore_BadShardLoading(t *testing.T) {
|
|||
// Equality check to make sure shards are always added prior to
|
||||
// completion being called. This test opens 3 total shards - 1 shard
|
||||
// fails, but we still want to track that it was attempted to be opened.
|
||||
require.Equal(t, msl.shardTracker, []string{
|
||||
require.Equal(t, msl.Tracked(), []string{
|
||||
"shard-add",
|
||||
"shard-add",
|
||||
"shard-add",
|
||||
|
@ -241,24 +240,30 @@ func TestStore_BadShard(t *testing.T) {
|
|||
for _, idx := range indexes {
|
||||
func() {
|
||||
s := MustOpenStore(t, idx)
|
||||
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
|
||||
defer func() {
|
||||
require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
|
||||
}()
|
||||
|
||||
sh := tsdb.NewTempShard(t, idx)
|
||||
shId := sh.ID()
|
||||
err := s.OpenShard(context.Background(), sh.Shard, false)
|
||||
var shId uint64 = 1
|
||||
require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", shId, true))
|
||||
err := s.ReopenShard(context.Background(), shId, false)
|
||||
require.NoError(t, err, "opening temp shard")
|
||||
require.NoError(t, sh.Close(), "closing temporary shard")
|
||||
|
||||
expErr := errors.New(errStr)
|
||||
s.SetShardOpenErrorForTest(sh.ID(), expErr)
|
||||
err2 := s.OpenShard(context.Background(), sh.Shard, false)
|
||||
s.SetShardOpenErrorForTest(shId, expErr)
|
||||
err2 := s.ReopenShard(context.Background(), shId, false)
|
||||
require.Error(t, err2, "no error opening bad shard")
|
||||
require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2)
|
||||
require.EqualError(t, err2, fmt.Errorf("not attempting to open shard %d; opening shard previously failed with: %w", shId, expErr).Error())
|
||||
|
||||
// make sure we didn't modify the shard open error when we tried to reopen it
|
||||
err2 = s.ReopenShard(context.Background(), shId, false)
|
||||
require.Error(t, err2, "no error opening bad shard")
|
||||
require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2)
|
||||
require.EqualError(t, err2, fmt.Errorf("not attempting to open shard %d; opening shard previously failed with: %w", shId, expErr).Error())
|
||||
|
||||
// This should succeed with the force (and because opening an open shard automatically succeeds)
|
||||
require.NoError(t, s.OpenShard(context.Background(), sh.Shard, true), "forced re-opening previously failing shard")
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, s.ReopenShard(context.Background(), shId, true), "forced re-opening previously failing shard")
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
@ -2195,7 +2200,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
|
|||
return
|
||||
}
|
||||
time.Sleep(500 * time.Microsecond)
|
||||
if err := s.OpenShard(context.Background(), sh, false); err != nil {
|
||||
if err := s.ReopenShard(context.Background(), sh.ID(), false); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
|
@ -2280,7 +2285,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
|
|||
return
|
||||
}
|
||||
time.Sleep(500 * time.Microsecond)
|
||||
if err := s.OpenShard(context.Background(), sh, false); err != nil {
|
||||
if err := s.ReopenShard(context.Background(), sh.ID(), false); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
|
@ -2371,7 +2376,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
|
|||
return
|
||||
}
|
||||
time.Sleep(500 * time.Microsecond)
|
||||
if err := s.OpenShard(context.Background(), sh, false); err != nil {
|
||||
if err := s.ReopenShard(context.Background(), sh.ID(), false); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
|
@ -2866,18 +2871,29 @@ func dirExists(path string) bool {
|
|||
}
|
||||
|
||||
type mockStartupLogger struct {
|
||||
shardTracker []string
|
||||
mu sync.Mutex
|
||||
// mu protects all following members.
|
||||
mu sync.Mutex
|
||||
|
||||
_shardTracker []string
|
||||
}
|
||||
|
||||
func (m *mockStartupLogger) AddShard() {
|
||||
m.mu.Lock()
|
||||
m.shardTracker = append(m.shardTracker, "shard-add")
|
||||
m._shardTracker = append(m._shardTracker, "shard-add")
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *mockStartupLogger) CompletedShard() {
|
||||
m.mu.Lock()
|
||||
m.shardTracker = append(m.shardTracker, "shard-complete")
|
||||
m._shardTracker = append(m._shardTracker, "shard-complete")
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *mockStartupLogger) Tracked() []string {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
tracked := make([]string, len(m._shardTracker))
|
||||
copy(tracked, m._shardTracker)
|
||||
return tracked
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue