fix(tsi1): wait deleting epoch before dropping shard
parent
57ea78e984
commit
42dba6487a
|
@ -714,19 +714,20 @@ func (s *Store) DeleteShard(shardID uint64) error {
|
|||
return nil
|
||||
}
|
||||
delete(s.shards, shardID)
|
||||
delete(s.epochs, shardID)
|
||||
s.pendingShardDeletes[shardID] = struct{}{}
|
||||
|
||||
db := sh.Database()
|
||||
// Determine if the shard contained any series that are not present in any
|
||||
// other shards in the database.
|
||||
shards := s.filterShards(byDatabase(db))
|
||||
epoch := s.epochs[shardID]
|
||||
s.mu.Unlock()
|
||||
|
||||
// Ensure the pending deletion flag is cleared on exit.
|
||||
defer func() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
delete(s.epochs, shardID)
|
||||
delete(s.pendingShardDeletes, shardID)
|
||||
s.databases[db].removeIndexType(sh.IndexType())
|
||||
}()
|
||||
|
@ -787,6 +788,15 @@ func (s *Store) DeleteShard(shardID uint64) error {
|
|||
|
||||
}
|
||||
|
||||
// enter the epoch tracker
|
||||
guards, gen := epoch.StartWrite()
|
||||
defer epoch.EndWrite(gen)
|
||||
|
||||
// wait for any guards before closing the shard
|
||||
for _, guard := range guards {
|
||||
guard.Wait()
|
||||
}
|
||||
|
||||
// Close the shard.
|
||||
if err := sh.Close(); err != nil {
|
||||
return err
|
||||
|
@ -808,9 +818,8 @@ func (s *Store) DeleteDatabase(name string) error {
|
|||
// no files locally, so nothing to do
|
||||
return nil
|
||||
}
|
||||
shards := s.filterShards(func(sh *Shard) bool {
|
||||
return sh.database == name
|
||||
})
|
||||
shards := s.filterShards(byDatabase(name))
|
||||
epochs := s.epochsForShards(shards)
|
||||
s.mu.RUnlock()
|
||||
|
||||
if err := s.walkShards(shards, func(sh *Shard) error {
|
||||
|
@ -818,6 +827,16 @@ func (s *Store) DeleteDatabase(name string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
epoch := epochs[sh.id]
|
||||
// enter the epoch tracker
|
||||
guards, gen := epoch.StartWrite()
|
||||
defer epoch.EndWrite(gen)
|
||||
|
||||
// wait for any guards before closing the shard
|
||||
for _, guard := range guards {
|
||||
guard.Wait()
|
||||
}
|
||||
|
||||
return sh.Close()
|
||||
}); err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue