Add DestroyTopic and DestroyConsumerGroup for CloseRocksMQ (#7730)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/7734/head
yukun 2021-09-10 20:54:01 +08:00 committed by GitHub
parent b78550374b
commit 92295b77a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 0 deletions

View File

@ -73,6 +73,23 @@ func CloseRocksMQ() {
if Rmq != nil {
Rmq.stopRetention()
if Rmq.store != nil {
Rmq.consumers.Range(func(k, v interface{}) bool {
var topic string
for _, consumer := range v.([]*Consumer) {
err := Rmq.DestroyConsumerGroup(consumer.Topic, consumer.GroupName)
if err != nil {
log.Warn("Rocksmq DestroyConsumerGroup failed!", zap.Any("topic", consumer.Topic), zap.Any("groupName", consumer.GroupName))
}
topic = consumer.Topic
}
if topic != "" {
err := Rmq.DestroyTopic(topic)
if err != nil {
log.Warn("Rocksmq DestroyTopic failed!", zap.Any("topic", topic))
}
}
return true
})
Rmq.store.Close()
}
}