mirror of https://github.com/milvus-io/milvus.git
parent
5cd935729c
commit
902e7172cb
|
@ -147,14 +147,21 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
// 1. filter segment by bloom filter
|
||||
for _, delMsg := range iMsg.deleteMessages {
|
||||
if iNode.streamingReplica != nil {
|
||||
if iNode.streamingReplica.getSegmentNum() != 0 {
|
||||
processDeleteMessages(iNode.streamingReplica, delMsg, delData)
|
||||
}
|
||||
if iNode.historicalReplica != nil {
|
||||
if iNode.historicalReplica.getSegmentNum() != 0 {
|
||||
processDeleteMessages(iNode.historicalReplica, delMsg, delData)
|
||||
}
|
||||
}
|
||||
|
||||
// 2. do predelete
|
||||
for segmentID, pks := range delData.deleteIDs {
|
||||
segment := iNode.getSegmentInReplica(segmentID)
|
||||
offset := segment.segmentPreDelete(len(pks))
|
||||
delData.deleteOffset[segmentID] = offset
|
||||
}
|
||||
|
||||
// 2. do delete
|
||||
for segmentID := range delData.deleteIDs {
|
||||
wg.Add(1)
|
||||
|
@ -205,15 +212,9 @@ func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, d
|
|||
continue
|
||||
}
|
||||
if len(pks) > 0 {
|
||||
offset := segment.segmentPreDelete(len(pks))
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...)
|
||||
// TODO(yukun) get offset of pks
|
||||
delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], msg.Timestamps[:len(pks)]...)
|
||||
delData.deleteOffset[segmentID] = offset
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue