mirror of https://github.com/milvus-io/milvus.git
Replace consumers map by sync.map
Signed-off-by: yukun <kun.yu@zilliz.com>pull/4973/head^2
parent
7f56f0405e
commit
604f96ed2b
|
@ -599,7 +599,6 @@ func (ms *TtMsgStream) findTimeTick(consumer Consumer,
|
|||
}
|
||||
|
||||
// set msg info to tsMsg
|
||||
log.Debug(msg.Topic() + "--------" + string(msg.ID().Serialize()))
|
||||
tsMsg.SetPosition(&msgstream.MsgPosition{
|
||||
ChannelName: filepath.Base(msg.Topic()),
|
||||
MsgID: msg.ID().Serialize(),
|
||||
|
|
|
@ -99,7 +99,6 @@ func consume(ctx context.Context, consumer *consumer) {
|
|||
log.Debug("client finished")
|
||||
return
|
||||
case _, ok := <-consumer.MsgMutex():
|
||||
log.Debug("Before consume")
|
||||
if !ok {
|
||||
// consumer MsgMutex closed, goroutine exit
|
||||
log.Debug("consumer MsgMutex closed")
|
||||
|
|
|
@ -60,7 +60,7 @@ type rocksmq struct {
|
|||
idAllocator allocator.GIDAllocator
|
||||
channelMu map[string]*sync.Mutex
|
||||
|
||||
consumers map[string][]*Consumer
|
||||
consumers sync.Map
|
||||
}
|
||||
|
||||
func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) {
|
||||
|
@ -84,7 +84,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
|
|||
idAllocator: idAllocator,
|
||||
}
|
||||
rmq.channelMu = make(map[string]*sync.Mutex)
|
||||
rmq.consumers = make(map[string][]*Consumer)
|
||||
rmq.consumers = sync.Map{}
|
||||
return rmq, nil
|
||||
}
|
||||
|
||||
|
@ -135,7 +135,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
delete(rmq.consumers, topicName)
|
||||
rmq.consumers.Delete(topicName)
|
||||
log.Debug("DestroyTopic: " + topicName)
|
||||
|
||||
return nil
|
||||
|
@ -144,9 +144,11 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
|||
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) {
|
||||
key := groupName + "/" + topicName + "/current_id"
|
||||
if rmq.checkKeyExist(key) {
|
||||
for _, con := range rmq.consumers[topicName] {
|
||||
if con.GroupName == groupName {
|
||||
return true, con
|
||||
if vals, ok := rmq.consumers.Load(topicName); ok {
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
if v.GroupName == groupName {
|
||||
return true, v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -169,12 +171,20 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
|
|||
}
|
||||
|
||||
func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) {
|
||||
for _, con := range rmq.consumers[consumer.Topic] {
|
||||
if con.GroupName == consumer.GroupName {
|
||||
return
|
||||
if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
if v.GroupName == consumer.GroupName {
|
||||
return
|
||||
}
|
||||
}
|
||||
consumers := vals.([]*Consumer)
|
||||
consumers = append(consumers, consumer)
|
||||
rmq.consumers.Store(consumer.Topic, consumers)
|
||||
} else {
|
||||
consumers := make([]*Consumer, 1)
|
||||
consumers[0] = consumer
|
||||
rmq.consumers.Store(consumer.Topic, consumers)
|
||||
}
|
||||
rmq.consumers[consumer.Topic] = append(rmq.consumers[consumer.Topic], consumer)
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
||||
|
@ -185,11 +195,15 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
|||
log.Debug("RocksMQ: remove " + key + " failed.")
|
||||
return err
|
||||
}
|
||||
for index, con := range rmq.consumers[topicName] {
|
||||
if con.GroupName == groupName {
|
||||
close(con.MsgMutex)
|
||||
rmq.consumers[topicName] = append(rmq.consumers[topicName][:index],
|
||||
rmq.consumers[topicName][index+1:]...)
|
||||
if vals, ok := rmq.consumers.Load(topicName); ok {
|
||||
consumers := vals.([]*Consumer)
|
||||
for index, v := range consumers {
|
||||
if v.GroupName == groupName {
|
||||
close(v.MsgMutex)
|
||||
consumers = append(consumers[:index], consumers[index+1:]...)
|
||||
rmq.consumers.Store(topicName, consumers)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -257,13 +271,14 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
|
|||
return err
|
||||
}
|
||||
|
||||
for _, consumer := range rmq.consumers[topicName] {
|
||||
// FIXME: process the problem if msgmutex is full
|
||||
select {
|
||||
case consumer.MsgMutex <- struct{}{}:
|
||||
continue
|
||||
default:
|
||||
continue
|
||||
if vals, ok := rmq.consumers.Load(topicName); ok {
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
select {
|
||||
case v.MsgMutex <- struct{}{}:
|
||||
continue
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -329,7 +344,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
|
||||
// When already consume to last mes, an empty slice will be returned
|
||||
if len(consumerMessage) == 0 {
|
||||
log.Debug("RocksMQ: consumerMessage is empty")
|
||||
//log.Debug("RocksMQ: consumerMessage is empty")
|
||||
return consumerMessage, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue