diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 3ae865b851..2ef75cbc81 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -147,14 +147,21 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // 1. filter segment by bloom filter for _, delMsg := range iMsg.deleteMessages { - if iNode.streamingReplica != nil { + if iNode.streamingReplica.getSegmentNum() != 0 { processDeleteMessages(iNode.streamingReplica, delMsg, delData) } - if iNode.historicalReplica != nil { + if iNode.historicalReplica.getSegmentNum() != 0 { processDeleteMessages(iNode.historicalReplica, delMsg, delData) } } + // 2. do predelete + for segmentID, pks := range delData.deleteIDs { + segment := iNode.getSegmentInReplica(segmentID) + offset := segment.segmentPreDelete(len(pks)) + delData.deleteOffset[segmentID] = offset + } + // 2. do delete for segmentID := range delData.deleteIDs { wg.Add(1) @@ -205,15 +212,9 @@ func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, d continue } if len(pks) > 0 { - offset := segment.segmentPreDelete(len(pks)) - if err != nil { - log.Warn(err.Error()) - continue - } delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...) // TODO(yukun) get offset of pks delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], msg.Timestamps[:len(pks)]...) - delData.deleteOffset[segmentID] = offset } } }