mirror of https://github.com/milvus-io/milvus.git
Add topic lock for DestroyTopic (#11786)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>pull/11902/head
parent
12f50cb22c
commit
da9228ab37
|
@ -273,48 +273,38 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
|||
// DestroyTopic removes messages for topic in rocksdb
|
||||
func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
||||
start := time.Now()
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic name = %s not exist", topicName)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return fmt.Errorf("get mutex failed, topic name = %s", topicName)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
beginKey := topicName + "/begin_id"
|
||||
endKey := topicName + "/end_id"
|
||||
|
||||
err := rmq.kv.Remove(beginKey)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: failed to remove key <" + beginKey + ">.")
|
||||
return err
|
||||
}
|
||||
|
||||
err = rmq.kv.Remove(endKey)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: failed to remove key <" + endKey + ">.")
|
||||
return err
|
||||
}
|
||||
var removedKeys []string
|
||||
|
||||
rmq.consumers.Delete(topicName)
|
||||
|
||||
ackedSizeKey := AckedSizeTitle + topicName
|
||||
err = rmq.kv.Remove(ackedSizeKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topicBeginIDKey := TopicBeginIDTitle + topicName
|
||||
err = rmq.kv.Remove(topicBeginIDKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// just for clean up old topics, for new topics this is not required
|
||||
lastRetTsKey := LastRetTsTitle + topicName
|
||||
err = rmq.kv.Remove(lastRetTsKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgSizeKey := MessageSizeTitle + topicName
|
||||
err = rmq.kv.Remove(msgSizeKey)
|
||||
|
||||
removedKeys = append(removedKeys, beginKey, endKey, ackedSizeKey, topicBeginIDKey, lastRetTsKey, msgSizeKey)
|
||||
// Batch remove, atomic operation
|
||||
err := rmq.kv.MultiRemove(removedKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
topicMu.Delete(topicName)
|
||||
// clean up retention info
|
||||
topicMu.Delete(topicName)
|
||||
rmq.retentionInfo.topics.Delete(topicName)
|
||||
log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
|
|
|
@ -206,6 +206,7 @@ func TestRocksmq_Dummy(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
channelName1 := "channel_dummy"
|
||||
topicMu.Store(channelName1, new(sync.Mutex))
|
||||
err = rmq.DestroyTopic(channelName1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
|
Loading…
Reference in New Issue