Merge pull request #9465 from influxdata/js-write-to-shard-during-delete
Mark a shard as in process of being deletedpull/9471/head
commit
2bc9e10757
|
@ -54,6 +54,10 @@ type Store struct {
|
||||||
// shared per-database indexes, only if using "inmem".
|
// shared per-database indexes, only if using "inmem".
|
||||||
indexes map[string]interface{}
|
indexes map[string]interface{}
|
||||||
|
|
||||||
|
// Maintains a set of shards that are in the process of deletion.
|
||||||
|
// This prevents new shards from being created while old ones are being deleted.
|
||||||
|
pendingShardDeletes map[uint64]struct{}
|
||||||
|
|
||||||
EngineOptions EngineOptions
|
EngineOptions EngineOptions
|
||||||
|
|
||||||
baseLogger *zap.Logger
|
baseLogger *zap.Logger
|
||||||
|
@ -69,13 +73,14 @@ type Store struct {
|
||||||
func NewStore(path string) *Store {
|
func NewStore(path string) *Store {
|
||||||
logger := zap.NewNop()
|
logger := zap.NewNop()
|
||||||
return &Store{
|
return &Store{
|
||||||
databases: make(map[string]struct{}),
|
databases: make(map[string]struct{}),
|
||||||
path: path,
|
path: path,
|
||||||
sfiles: make(map[string]*SeriesFile),
|
sfiles: make(map[string]*SeriesFile),
|
||||||
indexes: make(map[string]interface{}),
|
indexes: make(map[string]interface{}),
|
||||||
EngineOptions: NewEngineOptions(),
|
pendingShardDeletes: make(map[uint64]struct{}),
|
||||||
Logger: logger,
|
EngineOptions: NewEngineOptions(),
|
||||||
baseLogger: logger,
|
Logger: logger,
|
||||||
|
baseLogger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -465,6 +470,12 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shard may be undergoing a pending deletion. While the shard can be
|
||||||
|
// recreated, it must wait for the pending delete to finish.
|
||||||
|
if _, ok := s.pendingShardDeletes[shardID]; ok {
|
||||||
|
return fmt.Errorf("shard %d is pending deletion and cannot be created again until finished", shardID)
|
||||||
|
}
|
||||||
|
|
||||||
// Create the db and retention policy directories if they don't exist.
|
// Create the db and retention policy directories if they don't exist.
|
||||||
if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil {
|
if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -537,11 +548,29 @@ func (s *Store) DeleteShard(shardID uint64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the shard from Store so it's not returned to callers requesting
|
// Remove the shard from Store so it's not returned to callers requesting
|
||||||
// shards.
|
// shards. Also mark that this shard is currently being deleted in a separate
|
||||||
|
// map so that we do not have to retain the global store lock while deleting
|
||||||
|
// files.
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
if _, ok := s.pendingShardDeletes[shardID]; ok {
|
||||||
|
// We are already being deleted? This is possible if delete shard
|
||||||
|
// was called twice in sequence before the shard could be removed from
|
||||||
|
// the mapping.
|
||||||
|
// This is not an error because deleting a shard twice is not an error.
|
||||||
|
s.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
delete(s.shards, shardID)
|
delete(s.shards, shardID)
|
||||||
|
s.pendingShardDeletes[shardID] = struct{}{}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
// Ensure the pending deletion flag is cleared on exit.
|
||||||
|
defer func() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
delete(s.pendingShardDeletes, shardID)
|
||||||
|
}()
|
||||||
|
|
||||||
// Get the shard's local bitset of series IDs.
|
// Get the shard's local bitset of series IDs.
|
||||||
index, err := sh.Index()
|
index, err := sh.Index()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue