mirror of https://github.com/milvus-io/milvus.git
fix ttmsgstream (#5689)
* fix msgstream Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix asconsumer Signed-off-by: yefu.chen <yefu.chen@zilliz.com>pull/5779/head
parent
c172af78bd
commit
351d87055a
|
@ -352,6 +352,13 @@ func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
|
|||
log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
pos := tsMsg.Position()
|
||||
tsMsg.SetPosition(&MsgPosition{
|
||||
ChannelName: pos.ChannelName,
|
||||
MsgID: pos.MsgID,
|
||||
MsgGroup: consumer.Subscription(),
|
||||
Timestamp: tsMsg.BeginTs(),
|
||||
})
|
||||
|
||||
sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
|
||||
if ok {
|
||||
|
@ -697,21 +704,12 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
|
|||
var mp *MsgPosition
|
||||
var err error
|
||||
fn := func() error {
|
||||
if _, ok := ms.consumers[mp.ChannelName]; ok {
|
||||
return fmt.Errorf("the channel should not been subscribed")
|
||||
var ok bool
|
||||
consumer, ok = ms.consumers[mp.ChannelName]
|
||||
if !ok {
|
||||
return fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName)
|
||||
}
|
||||
|
||||
receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
|
||||
consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{
|
||||
Topic: mp.ChannelName,
|
||||
SubscriptionName: mp.MsgGroup,
|
||||
SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
|
||||
Type: mqclient.KeyShared,
|
||||
MessageChannel: receiveChannel,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if consumer == nil {
|
||||
return fmt.Errorf("consumer is nil")
|
||||
}
|
||||
|
@ -736,7 +734,6 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
|
|||
if len(mp.MsgID) == 0 {
|
||||
return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface")
|
||||
}
|
||||
|
||||
if err = Retry(20, time.Millisecond*200, fn); err != nil {
|
||||
return fmt.Errorf("Failed to seek, error %s", err.Error())
|
||||
}
|
||||
|
|
|
@ -254,6 +254,11 @@ func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPositi
|
|||
factory := ProtoUDFactory{}
|
||||
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
|
||||
consumerName := []string{}
|
||||
for _, c := range positions {
|
||||
consumerName = append(consumerName, c.ChannelName)
|
||||
}
|
||||
outputStream.AsConsumer(consumerName, positions[0].MsgGroup)
|
||||
outputStream.Seek(positions)
|
||||
outputStream.Start()
|
||||
return outputStream
|
||||
|
|
Loading…
Reference in New Issue