diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 28159f6b5a..b3c262a184 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -90,7 +90,6 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { continue } } - iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg)) } } @@ -110,10 +109,10 @@ func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) ddn.mu.Lock() if si, ok := ddn.seg2SegInfo[msg.GetSegmentID()]; ok { - if msg.EndTs() > si.GetDmlPosition().GetTimestamp() { - delete(ddn.seg2SegInfo, msg.GetSegmentID()) + if msg.EndTs() <= si.GetDmlPosition().GetTimestamp() { return true } + delete(ddn.seg2SegInfo, msg.GetSegmentID()) } ddn.mu.Unlock()