mirror of https://github.com/milvus-io/milvus.git
parent
72a7af3f4a
commit
f21c0ef2e9
|
@ -3,6 +3,7 @@ package msgstream
|
|||
import (
|
||||
"context"
|
||||
"log"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
|
@ -111,9 +112,6 @@ func (ms *PulsarMsgStream) Start() {
|
|||
|
||||
func (ms *PulsarMsgStream) Close() {
|
||||
ms.streamCancel()
|
||||
if ms.wait != nil {
|
||||
ms.wait.Wait()
|
||||
}
|
||||
|
||||
for _, producer := range ms.producers {
|
||||
if producer != nil {
|
||||
|
@ -227,37 +225,59 @@ func (ms *PulsarMsgStream) Consume() *MsgPack {
|
|||
|
||||
func (ms *PulsarMsgStream) bufMsgPackToChannel() {
|
||||
defer ms.wait.Done()
|
||||
|
||||
cases := make([]reflect.SelectCase, len(ms.consumers))
|
||||
for i := 0; i < len(ms.consumers); i++ {
|
||||
ch := (*ms.consumers[i]).Chan()
|
||||
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ms.ctx.Done():
|
||||
return
|
||||
default:
|
||||
tsMsgList := make([]TsMsg, 0)
|
||||
for i := 0; i < len(ms.consumers); i++ {
|
||||
consumerChan := (*ms.consumers[i]).Chan()
|
||||
chanLen := len(consumerChan)
|
||||
for l := 0; l < chanLen; l++ {
|
||||
pulsarMsg, ok := <-consumerChan
|
||||
if !ok {
|
||||
log.Printf("channel closed")
|
||||
return
|
||||
}
|
||||
(*ms.consumers[i]).AckID(pulsarMsg.ID())
|
||||
|
||||
headerMsg := internalPb.MsgHeader{}
|
||||
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
|
||||
if err != nil {
|
||||
log.Printf("Failed to unmarshal message header, error = %v", err)
|
||||
continue
|
||||
for {
|
||||
chosen, value, ok := reflect.Select(cases)
|
||||
if !ok {
|
||||
log.Printf("channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage)
|
||||
if !ok {
|
||||
log.Printf("type assertion failed, not consumer message type")
|
||||
continue
|
||||
}
|
||||
(*ms.consumers[chosen]).AckID(pulsarMsg.ID())
|
||||
|
||||
headerMsg := internalPb.MsgHeader{}
|
||||
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
|
||||
if err != nil {
|
||||
log.Printf("Failed to unmarshal message header, error = %v", err)
|
||||
continue
|
||||
}
|
||||
tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.MsgType)
|
||||
if err != nil {
|
||||
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
|
||||
continue
|
||||
}
|
||||
tsMsgList = append(tsMsgList, tsMsg)
|
||||
|
||||
noMoreMessage := true
|
||||
for i := 0; i < len(ms.consumers); i++ {
|
||||
if len((*ms.consumers[i]).Chan()) > 0 {
|
||||
noMoreMessage = false
|
||||
}
|
||||
tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.MsgType)
|
||||
if err != nil {
|
||||
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
|
||||
continue
|
||||
}
|
||||
tsMsgList = append(tsMsgList, tsMsg)
|
||||
}
|
||||
|
||||
if noMoreMessage {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(tsMsgList) > 0 {
|
||||
msgPack := MsgPack{Msgs: tsMsgList}
|
||||
ms.receiveBuf <- &msgPack
|
||||
|
|
|
@ -172,15 +172,20 @@ func (ss *searchService) doUnsolvedMsgSearch() {
|
|||
ss.unsolvedMsg = append(ss.unsolvedMsg, msg)
|
||||
}
|
||||
|
||||
msgBufferLength := len(ss.msgBuffer)
|
||||
for i := 0; i < msgBufferLength; i++ {
|
||||
for {
|
||||
msg := <-ss.msgBuffer
|
||||
if msg.EndTs() <= serviceTime {
|
||||
searchMsg = append(searchMsg, msg)
|
||||
continue
|
||||
}
|
||||
ss.unsolvedMsg = append(ss.unsolvedMsg, msg)
|
||||
|
||||
msgBufferLength := len(ss.msgBuffer)
|
||||
if msgBufferLength <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(searchMsg) <= 0 {
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue