From db33ffa518809ea9103b097f91f8500fa73864af Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 15 Nov 2022 14:37:07 +0800 Subject: [PATCH] Pass endPosition with vchannel in datanode flowgraph (#20589) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- internal/datanode/flow_graph_insert_buffer_node.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index cf83cedafb..0f1fef4de9 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -126,12 +126,14 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { pos.ChannelName = ibNode.channelName startPositions = append(startPositions, pos) } + fgMsg.startPositions = startPositions endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions)) for idx := range fgMsg.endPositions { pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition) pos.ChannelName = ibNode.channelName endPositions = append(endPositions, pos) } + fgMsg.endPositions = endPositions if startPositions[0].Timestamp < ibNode.lastTimestamp { // message stream should guarantee that this should not happen