Use last end position in pursuit mode (#26144)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/26155/head
congqixia 2023-08-05 16:55:06 +08:00 committed by GitHub
parent d2649b63db
commit 241117dd6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 7 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
@ -594,8 +595,10 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
return
default:
timeTickBuf := make([]TsMsg, 0)
startMsgPosition := make([]*msgpb.MsgPosition, 0)
endMsgPositions := make([]*msgpb.MsgPosition, 0)
// startMsgPosition := make([]*msgpb.MsgPosition, 0)
// endMsgPositions := make([]*msgpb.MsgPosition, 0)
startPositions := make(map[string]*msgpb.MsgPosition)
endPositions := make(map[string]*msgpb.MsgPosition)
var endTs uint64
var size uint64
@ -623,6 +626,11 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
if len(msgs) == 0 {
continue
}
startPos := typeutil.Clone(ms.chanMsgPos[consumer])
channelName := startPos.ChannelName
if _, ok := startPositions[channelName]; !ok {
startPositions[channelName] = startPos
}
tempBuffer := make([]TsMsg, 0)
var timeTickMsg TsMsg
for _, v := range msgs {
@ -639,7 +647,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
}
ms.chanMsgBuf[consumer] = tempBuffer
startMsgPosition = append(startMsgPosition, proto.Clone(ms.chanMsgPos[consumer]).(*msgpb.MsgPosition))
// startMsgPosition = append(startMsgPosition, proto.Clone(ms.chanMsgPos[consumer]).(*msgpb.MsgPosition))
var newPos *msgpb.MsgPosition
if len(tempBuffer) > 0 {
// if tempBuffer is not empty, use tempBuffer[0] to seek
@ -649,7 +657,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
Timestamp: currTs,
MsgGroup: consumer.Subscription(),
}
endMsgPositions = append(endMsgPositions, newPos)
endPositions[channelName] = newPos
} else if timeTickMsg != nil {
// if tempBuffer is empty, use timeTickMsg to seek
newPos = &msgpb.MsgPosition{
@ -658,7 +666,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
Timestamp: currTs,
MsgGroup: consumer.Subscription(),
}
endMsgPositions = append(endMsgPositions, newPos)
endPositions[channelName] = newPos
}
ms.chanMsgPos[consumer] = newPos
}
@ -683,8 +691,8 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
BeginTs: ms.lastTimeStamp,
EndTs: endTs,
Msgs: uniqueMsgs,
StartPositions: startMsgPosition,
EndPositions: endMsgPositions,
StartPositions: lo.MapToSlice(startPositions, func(_ string, pos *msgpb.MsgPosition) *msgpb.MsgPosition { return pos }),
EndPositions: lo.MapToSlice(endPositions, func(_ string, pos *msgpb.MsgPosition) *msgpb.MsgPosition { return pos }),
}
select {