Convert msg pchannel to vchannel before check IsCloseMsg (#22182) (#22197)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/22239/head
congqixia 2023-02-17 10:12:35 +08:00 committed by GitHub
parent dd63c71bd0
commit 97682a413f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 16 deletions

View File

@ -117,6 +117,23 @@ func (ibNode *insertBufferNode) IsValidInMsg(in []Msg) bool {
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
// replace pchannel with vchannel
startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions))
for idx := range fgMsg.startPositions {
pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
startPositions = append(startPositions, pos)
}
fgMsg.startPositions = startPositions
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
for idx := range fgMsg.endPositions {
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
endPositions = append(endPositions, pos)
}
fgMsg.endPositions = endPositions
if fgMsg.IsCloseMsg() {
if len(fgMsg.endPositions) != 0 {
// try to sync all segments
@ -152,22 +169,6 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
}
}()
// replace pchannel with vchannel
startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions))
for idx := range fgMsg.startPositions {
pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
startPositions = append(startPositions, pos)
}
fgMsg.startPositions = startPositions
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
for idx := range fgMsg.endPositions {
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
endPositions = append(endPositions, pos)
}
fgMsg.endPositions = endPositions
if startPositions[0].Timestamp < ibNode.lastTimestamp {
// message stream should guarantee that this should not happen
err := fmt.Errorf("insert buffer node consumed old messages, channel = %s, timestamp = %d, lastTimestamp = %d",