Convert msg pchannel to vchannel before check IsCloseMsg (#22182)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/21113/head
congqixia 2023-02-15 17:26:34 +08:00 committed by GitHub
parent f2575e5fa8
commit 9346f1752a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 16 deletions

View File

@ -117,6 +117,23 @@ func (ibNode *insertBufferNode) IsValidInMsg(in []Msg) bool {
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
// replace pchannel with vchannel
startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions))
for idx := range fgMsg.startPositions {
pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
startPositions = append(startPositions, pos)
}
fgMsg.startPositions = startPositions
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
for idx := range fgMsg.endPositions {
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
endPositions = append(endPositions, pos)
}
fgMsg.endPositions = endPositions
if fgMsg.IsCloseMsg() {
if len(fgMsg.endPositions) != 0 {
// try to sync all segments
@ -152,22 +169,6 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
}
}()
// replace pchannel with vchannel
startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions))
for idx := range fgMsg.startPositions {
pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
startPositions = append(startPositions, pos)
}
fgMsg.startPositions = startPositions
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
for idx := range fgMsg.endPositions {
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
endPositions = append(endPositions, pos)
}
fgMsg.endPositions = endPositions
if startPositions[0].Timestamp < ibNode.lastTimestamp {
// message stream should guarantee that this should not happen
err := fmt.Errorf("insert buffer node consumed old messages, channel = %s, timestamp = %d, lastTimestamp = %d",