mirror of https://github.com/milvus-io/milvus.git
fix: delegator filter out all partition's delete msg when loading segment (#31585)
May cause deleted data queryable a period of time. relate: https://github.com/milvus-io/milvus/issues/31484 https://github.com/milvus-io/milvus/issues/31548 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/32064/head
parent
089c805e0a
commit
5b693c466d
|
@ -749,7 +749,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
|
|||
for _, tsMsg := range msgPack.Msgs {
|
||||
if tsMsg.Type() == commonpb.MsgType_Delete {
|
||||
dmsg := tsMsg.(*msgstream.DeleteMsg)
|
||||
if dmsg.CollectionID != sd.collectionID || dmsg.GetPartitionID() != candidate.Partition() {
|
||||
if dmsg.CollectionID != sd.collectionID || (dmsg.GetPartitionID() != common.AllPartitionsID && dmsg.GetPartitionID() != candidate.Partition()) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -1114,6 +1114,39 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
|
|||
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
|
||||
}
|
||||
|
||||
func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
s.mq.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil)
|
||||
s.mq.EXPECT().Close()
|
||||
ch := make(chan *msgstream.MsgPack, 10)
|
||||
s.mq.EXPECT().Chan().Return(ch)
|
||||
|
||||
oracle := pkoracle.NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
|
||||
oracle.UpdateBloomFilter([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(2)})
|
||||
|
||||
baseMsg := &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete}
|
||||
|
||||
datas := []*msgstream.MsgPack{
|
||||
{EndTs: 10, EndPositions: []*msgpb.MsgPosition{{Timestamp: 10}}, Msgs: []msgstream.TsMsg{
|
||||
&msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: 1, PrimaryKeys: storage.ParseInt64s2IDs(1), Timestamps: []uint64{1}}},
|
||||
&msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: -1, PrimaryKeys: storage.ParseInt64s2IDs(2), Timestamps: []uint64{5}}},
|
||||
// invalid msg because partition wrong
|
||||
&msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: 2, PrimaryKeys: storage.ParseInt64s2IDs(1), Timestamps: []uint64{10}}},
|
||||
}},
|
||||
}
|
||||
|
||||
for _, data := range datas {
|
||||
ch <- data
|
||||
}
|
||||
|
||||
result, err := s.delegator.readDeleteFromMsgstream(ctx, &msgpb.MsgPosition{Timestamp: 0}, 10, oracle)
|
||||
s.NoError(err)
|
||||
s.Equal(2, len(result.Pks))
|
||||
}
|
||||
|
||||
func TestDelegatorDataSuite(t *testing.T) {
|
||||
suite.Run(t, new(DelegatorDataSuite))
|
||||
}
|
||||
|
|
|
@ -410,3 +410,18 @@ func ParsePrimaryKeys2IDs(pks []PrimaryKey) *schemapb.IDs {
|
|||
|
||||
return ret
|
||||
}
|
||||
|
||||
func ParseInt64s2IDs(pks ...int64) *schemapb.IDs {
|
||||
ret := &schemapb.IDs{}
|
||||
if len(pks) == 0 {
|
||||
return ret
|
||||
}
|
||||
|
||||
ret.IdField = &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: pks,
|
||||
},
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue