fix: rocksmq consumer register not concurrent safe (#39387) (#40885)

relate: https://github.com/milvus-io/milvus/issues/39336
pr: https://github.com/milvus-io/milvus/pull/39387

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/40993/head
aoiasd 2025-03-28 16:08:18 +08:00 committed by GitHub
parent 7db59fb1fb
commit d49f0b65d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 137 additions and 66 deletions

View File

@ -159,7 +159,7 @@ func (c *client) consume(consumer *consumer) {
case _, ok := <-newIncomingMsgCh:
if !ok {
// consumer MsgMutex closed, goroutine exit
log.Info("Consumer MsgMutex closed")
log.Info("Consumer MsgMutex closed", zap.String("topic", consumer.topic), zap.String("groupName", consumer.consumerName))
return
}
}

View File

@ -113,12 +113,103 @@ func checkRetention() bool {
var topicMu = sync.Map{}
type consumerList struct {
consumers map[string]*Consumer // GroupName -> *Consumer
mu sync.RWMutex
}
func (l *consumerList) Add(consumer *Consumer) {
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.consumers[consumer.GroupName]; ok {
return
}
l.consumers[consumer.GroupName] = consumer
}
func (l *consumerList) Remove(groupName string) *Consumer {
l.mu.Lock()
defer l.mu.Unlock()
delete(l.consumers, groupName)
return nil
}
func (l *consumerList) Get(groupName string) *Consumer {
l.mu.RLock()
defer l.mu.RUnlock()
if consumer, ok := l.consumers[groupName]; ok {
return consumer
}
return nil
}
func (l *consumerList) Notify(groupName string) {
l.mu.RLock()
defer l.mu.RUnlock()
if consumer, ok := l.consumers[groupName]; ok {
select {
case consumer.MsgMutex <- struct{}{}:
default:
}
}
}
func (l *consumerList) NotifyAll() {
l.mu.RLock()
defer l.mu.RUnlock()
for _, v := range l.consumers {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
}
func (l *consumerList) Len() int {
l.mu.RLock()
defer l.mu.RUnlock()
return len(l.consumers)
}
func (l *consumerList) Range(fn func(*Consumer) bool) bool {
l.mu.RLock()
defer l.mu.RUnlock()
for _, consumer := range l.consumers {
if !fn(consumer) {
return false
}
}
return true
}
// fetch consumer list
// unsafe, only use after close mq
func (l *consumerList) Collect() map[string]*Consumer {
return l.consumers
}
func newConsumerList() *consumerList {
return &consumerList{
consumers: make(map[string]*Consumer, 0),
mu: sync.RWMutex{},
}
}
type rocksmq struct {
store *gorocksdb.DB
cfh []*gorocksdb.ColumnFamilyHandle
kv kv.BaseKV
storeMu *sync.Mutex
consumers sync.Map
consumers sync.Map // map topic -> consumer list
consumersID sync.Map
retentionInfo *retentionInfo
@ -323,7 +414,7 @@ func (rmq *rocksmq) Close() {
rmq.consumers.Range(func(k, v interface{}) bool {
// TODO what happened if the server crashed? who handled the destroy consumer group? should we just handled it when rocksmq created?
// or we should not even make consumer info persistent?
for _, consumer := range v.([]*Consumer) {
for _, consumer := range v.(*consumerList).Collect() {
err := rmq.destroyConsumerGroupInternal(consumer.Topic, consumer.GroupName)
if err != nil {
log.Ctx(rmq.ctx).Warn("Failed to destroy consumer group in rocksmq!", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName), zap.Error(err))
@ -344,21 +435,23 @@ func (rmq *rocksmq) Info() bool {
rtn := true
rmq.consumers.Range(func(key, vals interface{}) bool {
topic, _ := key.(string)
consumerList, _ := vals.([]*Consumer)
consumerList, _ := vals.(*consumerList)
minConsumerPosition := UniqueID(-1)
minConsumerGroupName := ""
for _, consumer := range consumerList {
consumerPosition, ok := rmq.getCurrentID(consumer.Topic, consumer.GroupName)
consumerList.Range(func(c *Consumer) bool {
consumerPosition, ok := rmq.getCurrentID(c.Topic, c.GroupName)
if !ok {
log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName))
continue
log.Error("some group not regist", zap.String("topic", c.Topic), zap.String("groupName", c.GroupName))
return true
}
if minConsumerPosition == UniqueID(-1) || consumerPosition < minConsumerPosition {
minConsumerPosition = consumerPosition
minConsumerGroupName = consumer.GroupName
minConsumerGroupName = c.GroupName
}
}
return true
})
pageTsSizeKey := constructKey(PageTsTitle, topic)
pages, _, err := rmq.kv.LoadWithPrefix(context.TODO(), pageTsSizeKey)
@ -378,7 +471,7 @@ func (rmq *rocksmq) Info() bool {
log.Info("Rocksmq Info",
zap.String("topic", topic),
zap.Int("consumer num", len(consumerList)),
zap.Int("consumer num", consumerList.Len()),
zap.String("min position group names", minConsumerGroupName),
zap.Int64("min positions", minConsumerPosition),
zap.Int("page sum", len(pages)),
@ -424,6 +517,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
topicMu.Store(topicName, new(sync.Mutex))
}
rmq.consumers.LoadOrStore(topicName, newConsumerList())
// msgSizeKey -> msgSize
// topicIDKey -> topic creating time
kvs := make(map[string]string)
@ -516,12 +610,9 @@ func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Cons
key := constructCurrentID(topicName, groupName)
_, ok := rmq.consumersID.Load(key)
if ok {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == groupName {
return true, v, nil
}
}
if val, ok := rmq.consumers.Load(topicName); ok {
c := val.(*consumerList).Get(groupName)
return c != nil, c, nil
}
}
return false, nil, nil
@ -551,21 +642,14 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error {
return errors.New(RmqNotServingErrMsg)
}
start := time.Now()
if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == consumer.GroupName {
return nil
}
}
consumers := vals.([]*Consumer)
consumers = append(consumers, consumer)
rmq.consumers.Store(consumer.Topic, consumers)
} else {
consumers := make([]*Consumer, 1)
consumers[0] = consumer
rmq.consumers.Store(consumer.Topic, consumers)
val, ok := rmq.consumers.LoadOrStore(consumer.Topic, newConsumerList())
if !ok {
log.Warn("create consumer for topic not exist", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName))
}
log.Ctx(rmq.ctx).Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.Int64("elapsed", time.Since(start).Milliseconds()))
val.(*consumerList).Add(consumer)
log.Ctx(rmq.ctx).Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
@ -606,14 +690,10 @@ func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) er
rmq.consumersID.Delete(key)
rmq.topicName2LatestMsgID.Delete(topicName)
if vals, ok := rmq.consumers.Load(topicName); ok {
consumers := vals.([]*Consumer)
for index, v := range consumers {
if v.GroupName == groupName {
close(v.MsgMutex)
consumers = append(consumers[:index], consumers[index+1:]...)
rmq.consumers.Store(topicName, consumers)
break
}
consumers := vals.(*consumerList)
if c := consumers.Get(groupName); c != nil {
close(c.MsgMutex)
consumers.Remove(groupName)
}
}
log.Ctx(rmq.ctx).Debug("Rocksmq destroy consumer group successfully ", zap.String("topic", topicName),
@ -673,15 +753,8 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
return []UniqueID{}, err
}
writeTime := time.Since(start).Milliseconds()
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
if val, ok := rmq.consumers.Load(topicName); ok {
val.(*consumerList).NotifyAll()
}
// Update message page info
@ -1052,17 +1125,8 @@ func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) {
// Notify sends a mutex in MsgMutex channel to tell consumers to consume
func (rmq *rocksmq) Notify(topicName, groupName string) {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == groupName {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
}
if val, ok := rmq.consumers.Load(topicName); ok {
val.(*consumerList).Notify(groupName)
}
}
@ -1104,24 +1168,30 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI
// 2. Update acked ts and acked size for pageIDs
if vals, ok := rmq.consumers.Load(topicName); ok {
consumers, ok := vals.([]*Consumer)
if !ok || len(consumers) == 0 {
consumers, ok := vals.(*consumerList)
if !ok || consumers.Len() == 0 {
log.Error("update ack with no consumer", zap.String("topic", topicName))
return nil
}
// find min id of all consumer
var minBeginID UniqueID = lastID
for _, consumer := range consumers {
if consumer.GroupName != groupName {
beginID, ok := rmq.getCurrentID(consumer.Topic, consumer.GroupName)
var err error
consumers.Range(func(c *Consumer) bool {
if c.GroupName != groupName {
beginID, ok := rmq.getCurrentID(c.Topic, c.GroupName)
if !ok {
return fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", consumer.Topic, consumer.GroupName)
err = fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", c.Topic, c.GroupName)
return false
}
if beginID < minBeginID {
minBeginID = beginID
}
}
return true
})
if err != nil {
return err
}
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
@ -1134,7 +1204,8 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI
ackedTsKvs[pageAckedTsKey] = nowTs
}
}
err := rmq.kv.MultiSave(context.TODO(), ackedTsKvs)
err = rmq.kv.MultiSave(context.TODO(), ackedTsKvs)
if err != nil {
return err
}