fix: Set corresponding DataScope for `loadStreamDelete` (#35312)

Related to #35311

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/35328/head
congqixia 2024-08-06 22:32:23 +08:00 committed by GitHub
parent e937197270
commit 6ff238e88a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 12 additions and 1 deletions

View File

@ -646,6 +646,16 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
position = deltaPositions[0]
}
// after L0 segment feature
// growing segemnts should have load stream delete as well
deleteScope := querypb.DataScope_All
switch candidate.Type() {
case commonpb.SegmentState_Sealed:
deleteScope = querypb.DataScope_Historical
case commonpb.SegmentState_Growing:
deleteScope = querypb.DataScope_Streaming
}
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
deleteData.AppendBatch(deletedPks, deletedTss)
@ -660,7 +670,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
Scope: querypb.DataScope_Historical, // only sealed segment need to loadStreamDelete
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
@ -718,6 +728,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))