mirror of https://github.com/milvus-io/milvus.git
Fix rocksmq CreateConsumerGroup
Signed-off-by: yukun <kun.yu@zilliz.com>pull/4973/head^2
parent
ab200e5598
commit
207f6ae773
|
@ -77,11 +77,6 @@ func (rms *RmqMsgStream) Start() {
|
|||
func (rms *RmqMsgStream) Close() {
|
||||
rms.streamCancel()
|
||||
|
||||
for _, producer := range rms.producers {
|
||||
if producer != "" {
|
||||
_ = rocksmq.Rmq.DestroyChannel(producer)
|
||||
}
|
||||
}
|
||||
for _, consumer := range rms.consumers {
|
||||
_ = rocksmq.Rmq.DestroyConsumerGroup(consumer.GroupName, consumer.ChannelName)
|
||||
close(consumer.MsgMutex)
|
||||
|
|
|
@ -174,7 +174,13 @@ func (rmq *RocksMQ) CreateConsumerGroup(groupName string, channelName string) (*
|
|||
key := groupName + "/" + channelName + "/current_id"
|
||||
if rmq.checkKeyExist(key) {
|
||||
log.Debug("RocksMQ: " + key + " existed.")
|
||||
return nil, fmt.Errorf("ConsumerGroup %s already exists", groupName)
|
||||
for _, consumer := range rmq.notify[channelName] {
|
||||
if consumer.GroupName == groupName {
|
||||
return consumer, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
err := rmq.kv.Save(key, DefaultMessageID)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue