Fix singlenode search timeout caused by selectcase in rmq_msgstream

Signed-off-by: yukun <kun.yu@zilliz.com>
pull/4973/head^2
yukun 2021-03-04 15:53:17 +08:00 committed by yefu.chen
parent d4c7225916
commit 36bae90635
1 changed files with 26 additions and 43 deletions

View File

@ -65,17 +65,13 @@ func newRmqMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64
consumerChannels: consumerChannels,
consumerReflects: consumerReflects,
consumerLock: &sync.Mutex{},
wait: &sync.WaitGroup{},
}
return stream, nil
}
func (ms *RmqMsgStream) Start() {
ms.wait = &sync.WaitGroup{}
if ms.consumers != nil {
ms.wait.Add(1)
go ms.bufMsgPackToChannel()
}
}
func (ms *RmqMsgStream) Close() {
@ -123,6 +119,8 @@ func (ms *RmqMsgStream) AsConsumer(channels []string, groupName string) {
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(consumer.MsgNum),
})
ms.wait.Add(1)
go ms.receiveMsg(*consumer)
}
}
}
@ -235,56 +233,41 @@ func (ms *RmqMsgStream) Consume() (*msgstream.MsgPack, context.Context) {
}
}
func (ms *RmqMsgStream) bufMsgPackToChannel() {
/**
receiveMsg func is used to solve search timeout problem
which is caused by selectcase
*/
func (ms *RmqMsgStream) receiveMsg(consumer rocksmq.Consumer) {
defer ms.wait.Done()
for {
select {
case <-ms.ctx.Done():
log.Println("done")
return
default:
case msgNum, ok := <-consumer.MsgNum:
if !ok {
return
}
rmqMsg, err := rocksmq.Rmq.Consume(consumer.GroupName, consumer.ChannelName, msgNum)
if err != nil {
log.Printf("Failed to consume message in rocksmq, error = %v", err)
continue
}
tsMsgList := make([]msgstream.TsMsg, 0)
for {
chosen, value, ok := reflect.Select(ms.consumerReflects)
if !ok {
log.Printf("channel closed")
return
}
msgNum := value.Interface().(int)
rmqMsg, err := rocksmq.Rmq.Consume(ms.consumers[chosen].GroupName, ms.consumers[chosen].ChannelName, msgNum)
for j := 0; j < len(rmqMsg); j++ {
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(rmqMsg[j].Payload, &headerMsg)
if err != nil {
log.Printf("Failed to consume message in rocksmq, error = %v", err)
log.Printf("Failed to unmarshal message header, error = %v", err)
continue
}
for j := 0; j < len(rmqMsg); j++ {
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(rmqMsg[j].Payload, &headerMsg)
if err != nil {
log.Printf("Failed to unmarshal message header, error = %v", err)
continue
}
tsMsg, err := ms.unmarshal.Unmarshal(rmqMsg[j].Payload, headerMsg.Base.MsgType)
if err != nil {
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
continue
}
tsMsgList = append(tsMsgList, tsMsg)
}
noMoreMessage := true
for k := 0; k < len(ms.consumers); k++ {
if len(ms.consumers[k].MsgNum) > 0 {
noMoreMessage = false
}
}
if noMoreMessage {
break
tsMsg, err := ms.unmarshal.Unmarshal(rmqMsg[j].Payload, headerMsg.Base.MsgType)
if err != nil {
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
continue
}
tsMsgList = append(tsMsgList, tsMsg)
}
if len(tsMsgList) > 0 {
msgPack := util.MsgPack{Msgs: tsMsgList}
ms.receiveBuf <- &msgPack