From a647b84f3e384aff578e0e9f7b46b6e295f81ee2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 20 Mar 2024 19:01:05 +0800 Subject: [PATCH] enhance: Add AllPartitionsID const to replace InvalidPartitionID (#31438) "-1" as `InvalidPartitionID` previously used as All partition place holder in delete cases. It's confusing and hard to maintain when a const var has more than one meaning. This PR add `AllPartitionsID` to replace these usages in delete scenarios. --------- Signed-off-by: Congqi Xia --- internal/datacoord/handler.go | 2 +- internal/datacoord/server_test.go | 2 +- internal/datanode/metacache/actions.go | 2 +- internal/datanode/writebuffer/bf_write_buffer.go | 2 +- internal/datanode/writebuffer/l0_write_buffer.go | 2 +- internal/proxy/task_delete.go | 2 +- internal/proxy/task_upsert.go | 2 +- internal/querycoordv2/meta/target_manager.go | 2 +- internal/querynodev2/delegator/delegator_data.go | 4 ++-- internal/querynodev2/delegator/delegator_data_test.go | 4 ++-- internal/querynodev2/pkoracle/candidate.go | 2 +- pkg/common/common.go | 3 +++ 12 files changed, 16 insertions(+), 13 deletions(-) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 134b5f7112..e897e9fb2b 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -131,7 +131,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID }) partitionSet := typeutil.NewUniqueSet(validPartitions...) for _, s := range segments { - if (partitionSet.Len() > 0 && !partitionSet.Contain(s.PartitionID) && s.GetPartitionID() != common.InvalidPartitionID) || + if (partitionSet.Len() > 0 && !partitionSet.Contain(s.PartitionID) && s.GetPartitionID() != common.AllPartitionsID) || (s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { continue } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index f97fe54d90..4943ae9886 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1530,7 +1530,7 @@ func TestGetQueryVChanPositions(t *testing.T) { s4 := &datapb.SegmentInfo{ ID: 4, CollectionID: 0, - PartitionID: common.InvalidPartitionID, + PartitionID: common.AllPartitionsID, InsertChannel: "ch1", State: commonpb.SegmentState_Flushed, StartPosition: &msgpb.MsgPosition{ diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index b83a88636e..20d18f4acd 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -63,7 +63,7 @@ func (f SegmentFilterFunc) SegmentIDs() ([]int64, bool) { func WithPartitionID(partitionID int64) SegmentFilter { return SegmentFilterFunc(func(info *SegmentInfo) bool { - return partitionID == common.InvalidPartitionID || info.partitionID == partitionID + return partitionID == common.AllPartitionsID || info.partitionID == partitionID }) } diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index e909f2087d..ace93fa533 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -53,7 +53,7 @@ func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs } for _, inData := range groups { - if delMsg.GetPartitionID() == common.InvalidPartitionID || delMsg.GetPartitionID() == inData.partitionID { + if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { var deletePks []storage.PrimaryKey var deleteTss []typeutil.Timestamp for idx, pk := range pks { diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index c98f1b6440..e80f3840fd 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -71,7 +71,7 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs } for _, inData := range groups { - if delMsg.GetPartitionID() == common.InvalidPartitionID || delMsg.GetPartitionID() == inData.partitionID { + if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { var deletePks []storage.PrimaryKey var deleteTss []typeutil.Timestamp for idx, pk := range pks { diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 8a65985564..f7e4c8210e 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -267,7 +267,7 @@ func (dr *deleteRunner) Init(ctx context.Context) error { dr.partitionKeyMode = dr.schema.IsPartitionKeyCollection() // get partitionIDs of delete - dr.partitionID = common.InvalidPartitionID + dr.partitionID = common.AllPartitionsID if len(dr.req.PartitionName) > 0 { if dr.partitionKeyMode { return errors.New("not support manually specifying the partition names if partition key mode is used") diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index b89c09d954..f44fe455c3 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -245,7 +245,7 @@ func (it *upsertTask) deletePreExecute(ctx context.Context) error { // multi entities with same pk and diff partition keys may be hashed to multi physical partitions // if deleteMsg.partitionID = common.InvalidPartition, // all segments with this pk under the collection will have the delete record - it.upsertMsg.DeleteMsg.PartitionID = common.InvalidPartitionID + it.upsertMsg.DeleteMsg.PartitionID = common.AllPartitionsID } else { // partition name could be defaultPartitionName or name specified by sdk partName := it.upsertMsg.DeleteMsg.PartitionName diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index a957943552..ea9ed9f17e 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -231,7 +231,7 @@ func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, ch partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...) for _, segmentInfo := range segmentInfos { - if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.InvalidPartitionID { + if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.AllPartitionsID { segments[segmentInfo.GetID()] = segmentInfo } } diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index f983533a26..6baed214b5 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -481,7 +481,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg func (sd *shardDelegator) GetLevel0Deletions(partitionID int64) ([]storage.PrimaryKey, []storage.Timestamp) { sd.level0Mut.RLock() deleteData, ok1 := sd.level0Deletions[partitionID] - allPartitionsDeleteData, ok2 := sd.level0Deletions[common.InvalidPartitionID] + allPartitionsDeleteData, ok2 := sd.level0Deletions[common.AllPartitionsID] sd.level0Mut.RUnlock() // we may need to merge the specified partition deletions and the all partitions deletions, // so release the mutex as early as possible. @@ -647,7 +647,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, deleteRecords := sd.deleteBuffer.ListAfter(position.GetTimestamp()) for _, entry := range deleteRecords { for _, record := range entry.Data { - if record.PartitionID != common.InvalidPartitionID && candidate.Partition() != record.PartitionID { + if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID { continue } for i, pk := range record.DeleteData.Pks { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 897a4e4de2..e852a0f54f 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -1002,7 +1002,7 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { pks, _ = delegator.GetLevel0Deletions(partitionID + 1) s.Empty(pks) - delegator.level0Deletions[common.InvalidPartitionID] = allPartitionDeleteData + delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData pks, _ = delegator.GetLevel0Deletions(partitionID) s.Len(pks, 2) s.True(pks[0].EQ(partitionDeleteData.Pks[0])) @@ -1023,7 +1023,7 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { pks, _ = delegator.GetLevel0Deletions(partitionID + 1) s.Empty(pks) - delegator.level0Deletions[common.InvalidPartitionID] = allPartitionDeleteData + delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData pks, _ = delegator.GetLevel0Deletions(partitionID) s.Len(pks, 2) s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) diff --git a/internal/querynodev2/pkoracle/candidate.go b/internal/querynodev2/pkoracle/candidate.go index 30317cc859..037963966f 100644 --- a/internal/querynodev2/pkoracle/candidate.go +++ b/internal/querynodev2/pkoracle/candidate.go @@ -67,6 +67,6 @@ func WithSegmentIDs(segmentIDs ...int64) CandidateFilter { // WithPartitionID returns CandidateFilter with provided partitionID. func WithPartitionID(partitionID int64) CandidateFilter { return func(candidate candidateWithWorker) bool { - return candidate.Partition() == partitionID || partitionID == common.InvalidPartitionID + return candidate.Partition() == partitionID || partitionID == common.AllPartitionsID } } diff --git a/pkg/common/common.go b/pkg/common/common.go index 24e26ff774..6c54347ccd 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -59,6 +59,9 @@ const ( // InvalidPartitionID indicates that the partition is not specified. It will be set when the partitionName is empty InvalidPartitionID = int64(-1) + // AllPartitionsID indicates data applies to all partitions. + AllPartitionsID = int64(-1) + // InvalidFieldID indicates that the field does not exist . It will be set when the field is not found. InvalidFieldID = int64(-1)