Fix rocksmq seek error (#5432)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

In milvus-standalone, when seek is called, rocksmq won't put a
mutex in MsgMutex channel, which cause that the consume
process hang. So the seek function in rocksmq needs to 
notify the consumer to consume.
pull/5424/head
yukun 2021-05-26 17:31:09 +08:00 committed by GitHub
parent e021f5e670
commit a3fb1356de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 1 deletions

View File

@ -94,5 +94,10 @@ func (c *consumer) Chan() <-chan ConsumerMessage {
}
func (c *consumer) Seek(id UniqueID) error { //nolint:govet
return c.client.server.Seek(c.topic, c.consumerName, id)
err := c.client.server.Seek(c.topic, c.consumerName, id)
if err != nil {
return err
}
c.client.server.Notify(c.topic, c.consumerName)
return nil
}

View File

@ -38,4 +38,6 @@ type RocksMQ interface {
Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)
Seek(topicName string, groupName string, msgID UniqueID) error
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer)
Notify(topicName, groupName string)
}

View File

@ -400,3 +400,18 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
return nil
}
func (rmq *rocksmq) Notify(topicName, groupName string) {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == groupName {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
}
}
}