fix: remember shards that fail Open(), avoid repeated attempts (#23437)

If a shard cannot be opened, store its ID and last error.
Prevent future attempts to open during this invocation of
influxDB. This information is not persisted.

closes https://github.com/influxdata/influxdb/issues/23428
closes https://github.com/influxdata/influxdb/issues/23426
pull/23462/head
davidby-influx 2022-06-13 10:32:47 -07:00 committed by GitHub
parent d3db48e93d
commit 54ac7e54ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 7 deletions

View File

@ -407,7 +407,7 @@ func (w *PointsWriter) writeToShard(writeCtx tsdb.WriteContext, shard *meta.Shar
// store has not actually created this shard, tell it to create it and // store has not actually created this shard, tell it to create it and
// retry the write // retry the write
if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil { if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err)) w.Logger.Warn("Write failed creating shard", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1) atomic.AddInt64(&w.stats.WriteErr, 1)
return err return err
} }

View File

@ -81,6 +81,28 @@ type StoreStatistics struct {
SeriesCreated int64 SeriesCreated int64
} }
type shardErrorMap struct {
mu sync.Mutex
shardErrors map[uint64]error
}
func (se *shardErrorMap) setShardOpenError(shardID uint64, err error) {
se.mu.Lock()
defer se.mu.Unlock()
if err == nil {
delete(se.shardErrors, shardID)
} else {
se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)}
}
}
func (se *shardErrorMap) shardError(shardID uint64) (error, bool) {
se.mu.Lock()
defer se.mu.Unlock()
oldErr, hasErr := se.shardErrors[shardID]
return oldErr, hasErr
}
// Store manages shards and indexes for databases. // Store manages shards and indexes for databases.
type Store struct { type Store struct {
mu sync.RWMutex mu sync.RWMutex
@ -97,6 +119,9 @@ type Store struct {
// This prevents new shards from being created while old ones are being deleted. // This prevents new shards from being created while old ones are being deleted.
pendingShardDeletes map[uint64]struct{} pendingShardDeletes map[uint64]struct{}
// Maintains a set of shards that failed to open
badShards shardErrorMap
// Epoch tracker helps serialize writes and deletes that may conflict. It // Epoch tracker helps serialize writes and deletes that may conflict. It
// is stored by shard. // is stored by shard.
epochs map[uint64]*epochTracker epochs map[uint64]*epochTracker
@ -125,6 +150,7 @@ func NewStore(path string) *Store {
sfiles: make(map[string]*SeriesFile), sfiles: make(map[string]*SeriesFile),
indexes: make(map[string]interface{}), indexes: make(map[string]interface{}),
pendingShardDeletes: make(map[uint64]struct{}), pendingShardDeletes: make(map[uint64]struct{}),
badShards: shardErrorMap{shardErrors: make(map[uint64]error)},
epochs: make(map[uint64]*epochTracker), epochs: make(map[uint64]*epochTracker),
EngineOptions: NewEngineOptions(), EngineOptions: NewEngineOptions(),
Logger: logger, Logger: logger,
@ -449,9 +475,9 @@ func (s *Store) loadShards() error {
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger) shard.WithLogger(s.baseLogger)
err = shard.Open() err = s.OpenShard(shard, false)
if err != nil { if err != nil {
log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err)) log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)} resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}
return return
} }
@ -607,6 +633,42 @@ func (s *Store) Shard(id uint64) *Shard {
return sh return sh
} }
type ErrPreviousShardFail struct {
error
}
func (e ErrPreviousShardFail) Unwrap() error {
return e.error
}
func (e ErrPreviousShardFail) Is(err error) bool {
_, sOk := err.(ErrPreviousShardFail)
_, pOk := err.(*ErrPreviousShardFail)
return sOk || pOk
}
func (e ErrPreviousShardFail) Error() string {
return e.error.Error()
}
func (s *Store) OpenShard(sh *Shard, force bool) error {
if sh == nil {
return errors.New("cannot open nil shard")
}
oldErr, bad := s.badShards.shardError(sh.ID())
if force || !bad {
err := sh.Open()
s.badShards.setShardOpenError(sh.ID(), err)
return err
} else {
return oldErr
}
}
func (s *Store) SetShardOpenErrorForTest(shardID uint64, err error) {
s.badShards.setShardOpenError(shardID, err)
}
// Shards returns a list of shards by id. // Shards returns a list of shards by id.
func (s *Store) Shards(ids []uint64) []*Shard { func (s *Store) Shards(ids []uint64) []*Shard {
s.mu.RLock() s.mu.RLock()
@ -700,7 +762,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
shard.WithLogger(s.baseLogger) shard.WithLogger(s.baseLogger)
shard.EnableOnOpen = enabled shard.EnableOnOpen = enabled
if err := shard.Open(); err != nil { if err := s.OpenShard(shard, false); err != nil {
return err return err
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem" "github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxql" "github.com/influxdata/influxql"
"github.com/stretchr/testify/require"
) )
// Ensure the store can delete a retention policy and all shards under // Ensure the store can delete a retention policy and all shards under
@ -143,6 +144,31 @@ func TestStore_CreateShard(t *testing.T) {
} }
} }
func TestStore_BadShard(t *testing.T) {
const errStr = "a shard open error"
indexes := tsdb.RegisteredIndexes()
for _, idx := range indexes {
func() {
s := MustOpenStore(idx)
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
sh := tsdb.NewTempShard(idx)
err := s.OpenShard(sh.Shard, false)
require.NoError(t, err, "opening temp shard")
defer require.NoError(t, sh.Close(), "closing temporary shard")
s.SetShardOpenErrorForTest(sh.ID(), errors.New(errStr))
err2 := s.OpenShard(sh.Shard, false)
require.Error(t, err2, "no error opening bad shard")
require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2)
require.EqualError(t, err2, "opening shard previously failed with: "+errStr)
// This should succeed with the force (and because opening an open shard automatically succeeds)
require.NoError(t, s.OpenShard(sh.Shard, true), "forced re-opening previously failing shard")
}()
}
}
func TestStore_CreateMixedShards(t *testing.T) { func TestStore_CreateMixedShards(t *testing.T) {
t.Parallel() t.Parallel()
@ -2048,7 +2074,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
return return
} }
time.Sleep(500 * time.Microsecond) time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil { if err := s.OpenShard(sh, false); err != nil {
errC <- err errC <- err
return return
} }
@ -2133,7 +2159,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
return return
} }
time.Sleep(500 * time.Microsecond) time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil { if err := s.OpenShard(sh, false); err != nil {
errC <- err errC <- err
return return
} }
@ -2224,7 +2250,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
return return
} }
time.Sleep(500 * time.Microsecond) time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil { if err := s.OpenShard(sh, false); err != nil {
errC <- err errC <- err
return return
} }