mirror of https://github.com/milvus-io/milvus.git
* Fix #6082 Unstable issue for standalone mode in CI Signed-off-by: yhmo <yihua.mo@zilliz.com> * Fix #6082 Unstable issue for standalone mode in CI Signed-off-by: yhmo <yihua.mo@zilliz.com>pull/6094/head
parent
5d7eeddeb9
commit
a807dca132
|
@ -25,9 +25,24 @@ type MemConsumer struct {
|
|||
MsgChan chan *MsgPack
|
||||
}
|
||||
|
||||
type ChannelMsg struct {
|
||||
ChannelName string
|
||||
Msg *MsgPack
|
||||
}
|
||||
|
||||
type MemMQ struct {
|
||||
consumers map[string][]*MemConsumer
|
||||
consumerMu sync.Mutex
|
||||
|
||||
// The msgBuffer is to handle this case: producer produce message before consumer is created
|
||||
// How it works:
|
||||
// when producer try produce message in MemMQ, if there is no consumser for this channel,
|
||||
// the message will be put into msgBuffer. Once a consumer of this channel is created,
|
||||
// the first consumer will receive this message.
|
||||
// Note:
|
||||
// To simplify the logic, it only send message to the first consumer.
|
||||
// Since the MemMQ is only used for Standalone mode.
|
||||
msgBuffer []*ChannelMsg
|
||||
}
|
||||
|
||||
func (mmq *MemMQ) CreateChannel(channelName string) error {
|
||||
|
@ -82,6 +97,17 @@ func (mmq *MemMQ) CreateConsumerGroup(groupName string, channelName string) (*Me
|
|||
MsgChan: make(chan *MsgPack, 1024),
|
||||
}
|
||||
|
||||
// consume messages of previous produce
|
||||
tempMsgBuf := make([]*ChannelMsg, 0)
|
||||
for _, msg := range mmq.msgBuffer {
|
||||
if msg.ChannelName == channelName {
|
||||
consumer.MsgChan <- msg.Msg
|
||||
} else {
|
||||
tempMsgBuf = append(tempMsgBuf, msg)
|
||||
}
|
||||
}
|
||||
mmq.msgBuffer = tempMsgBuf
|
||||
|
||||
mmq.consumers[channelName] = append(mmq.consumers[channelName], &consumer)
|
||||
return &consumer, nil
|
||||
}
|
||||
|
@ -122,8 +148,18 @@ func (mmq *MemMQ) Produce(channelName string, msgPack *MsgPack) error {
|
|||
return errors.New("Channel " + channelName + " doesn't exist")
|
||||
}
|
||||
|
||||
for _, consumer := range consumers {
|
||||
consumer.MsgChan <- msgPack
|
||||
if len(consumers) > 0 {
|
||||
// consumer already exist, send msg
|
||||
for _, consumer := range consumers {
|
||||
consumer.MsgChan <- msgPack
|
||||
}
|
||||
} else {
|
||||
// consumer not exist, put the msg to buffer, it will be consumed by the first consumer later
|
||||
msg := &ChannelMsg{
|
||||
ChannelName: channelName,
|
||||
Msg: msgPack,
|
||||
}
|
||||
mmq.msgBuffer = append(mmq.msgBuffer, msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -177,6 +213,7 @@ func InitMmq() error {
|
|||
consumerMu: sync.Mutex{},
|
||||
}
|
||||
Mmq.consumers = make(map[string][]*MemConsumer)
|
||||
Mmq.msgBuffer = make([]*ChannelMsg, 0)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -138,6 +138,7 @@ func TestStream_GlobalMmq_Func(t *testing.T) {
|
|||
assert.Equal(t, len(Mmq.consumers), 0, "global mmq channel error")
|
||||
}
|
||||
|
||||
// produce msg after consumer created
|
||||
func TestStream_MemMsgStream_Produce(t *testing.T) {
|
||||
channels := []string{"red", "blue", "black", "green"}
|
||||
produceStream := createProducer(channels)
|
||||
|
@ -164,6 +165,33 @@ func TestStream_MemMsgStream_Produce(t *testing.T) {
|
|||
produceStream.Close()
|
||||
}
|
||||
|
||||
// produce msg begore consumer created
|
||||
func TestStream_MemMsgStream_Consume(t *testing.T) {
|
||||
channels := []string{"red", "blue", "black", "green"}
|
||||
produceStream := createProducer(channels)
|
||||
defer produceStream.Close()
|
||||
|
||||
msgPack := MsgPack{}
|
||||
var hashValue uint32 = 3
|
||||
msgPack.Msgs = append(msgPack.Msgs, mGetTsMsg(commonpb.MsgType_Search, 1, hashValue))
|
||||
err := produceStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Fatalf("new msgstream error = %v", err)
|
||||
}
|
||||
|
||||
consumerStreams := createCondumers(channels)
|
||||
for _, cs := range consumerStreams {
|
||||
defer cs.Close()
|
||||
}
|
||||
|
||||
msg := consumerStreams[hashValue].Consume()
|
||||
if msg == nil {
|
||||
log.Fatalf("msgstream consume error")
|
||||
}
|
||||
|
||||
produceStream.Close()
|
||||
}
|
||||
|
||||
func TestStream_MemMsgStream_BroadCast(t *testing.T) {
|
||||
channels := []string{"red", "blue", "black", "green"}
|
||||
produceStream := createProducer(channels)
|
||||
|
|
|
@ -98,11 +98,9 @@ func (f *RmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
|
|||
}
|
||||
|
||||
func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
|
||||
rmqClient, err := mqclient.NewRmqClient(rocksmq.ClientOptions{Server: rocksmqserver.Rmq})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher())
|
||||
InitMmq()
|
||||
|
||||
return NewMemMsgStream(ctx, f.ReceiveBufSize)
|
||||
}
|
||||
|
||||
func NewRmsFactory() Factory {
|
||||
|
|
Loading…
Reference in New Issue