diff --git a/tsdb/store.go b/tsdb/store.go index 1efad22d0c..ff50bc33ce 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -54,6 +54,10 @@ type Store struct { // shared per-database indexes, only if using "inmem". indexes map[string]interface{} + // Maintains a set of shards that are in the process of deletion. + // This prevents new shards from being created while old ones are being deleted. + pendingShardDeletes map[uint64]struct{} + EngineOptions EngineOptions baseLogger *zap.Logger @@ -69,13 +73,14 @@ type Store struct { func NewStore(path string) *Store { logger := zap.NewNop() return &Store{ - databases: make(map[string]struct{}), - path: path, - sfiles: make(map[string]*SeriesFile), - indexes: make(map[string]interface{}), - EngineOptions: NewEngineOptions(), - Logger: logger, - baseLogger: logger, + databases: make(map[string]struct{}), + path: path, + sfiles: make(map[string]*SeriesFile), + indexes: make(map[string]interface{}), + pendingShardDeletes: make(map[uint64]struct{}), + EngineOptions: NewEngineOptions(), + Logger: logger, + baseLogger: logger, } } @@ -465,6 +470,12 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en return nil } + // Shard may be undergoing a pending deletion. While the shard can be + // recreated, it must wait for the pending delete to finish. + if _, ok := s.pendingShardDeletes[shardID]; ok { + return fmt.Errorf("shard %d is pending deletion and cannot be created again until finished", shardID) + } + // Create the db and retention policy directories if they don't exist. if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil { return err @@ -537,11 +548,29 @@ func (s *Store) DeleteShard(shardID uint64) error { } // Remove the shard from Store so it's not returned to callers requesting - // shards. + // shards. Also mark that this shard is currently being deleted in a separate + // map so that we do not have to retain the global store lock while deleting + // files. s.mu.Lock() + if _, ok := s.pendingShardDeletes[shardID]; ok { + // We are already being deleted? This is possible if delete shard + // was called twice in sequence before the shard could be removed from + // the mapping. + // This is not an error because deleting a shard twice is not an error. + s.mu.Unlock() + return nil + } delete(s.shards, shardID) + s.pendingShardDeletes[shardID] = struct{}{} s.mu.Unlock() + // Ensure the pending deletion flag is cleared on exit. + defer func() { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.pendingShardDeletes, shardID) + }() + // Get the shard's local bitset of series IDs. index, err := sh.Index() if err != nil {