mirror of https://github.com/milvus-io/milvus.git
Fix LoadBalance loads the same segment multiple times (#16734)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/16747/head
parent
2f49db9d3c
commit
93416d5524
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
const timeoutForRPC = 10 * time.Second
|
||||
|
@ -1924,16 +1925,16 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
for _, nodeID := range lbt.SourceNodeIDs {
|
||||
segmentID2Info := make(map[UniqueID]*querypb.SegmentInfo)
|
||||
dmChannel2WatchInfo := make(map[string]*querypb.DmChannelWatchInfo)
|
||||
recoveredCollectionIDs := make(map[UniqueID]struct{})
|
||||
recoveredCollectionIDs := make(typeutil.UniqueSet)
|
||||
segmentInfos := lbt.meta.getSegmentInfosByNode(nodeID)
|
||||
for _, segmentInfo := range segmentInfos {
|
||||
segmentID2Info[segmentInfo.SegmentID] = segmentInfo
|
||||
recoveredCollectionIDs[segmentInfo.CollectionID] = struct{}{}
|
||||
recoveredCollectionIDs.Insert(segmentInfo.CollectionID)
|
||||
}
|
||||
dmChannelWatchInfos := lbt.meta.getDmChannelInfosByNodeID(nodeID)
|
||||
for _, watchInfo := range dmChannelWatchInfos {
|
||||
dmChannel2WatchInfo[watchInfo.DmChannel] = watchInfo
|
||||
recoveredCollectionIDs[watchInfo.CollectionID] = struct{}{}
|
||||
recoveredCollectionIDs.Insert(watchInfo.CollectionID)
|
||||
}
|
||||
|
||||
for collectionID := range recoveredCollectionIDs {
|
||||
|
@ -1977,27 +1978,27 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
|
||||
for _, segmentBingLog := range binlogs {
|
||||
segmentID := segmentBingLog.SegmentID
|
||||
if info, ok := segmentID2Info[segmentID]; ok {
|
||||
if _, ok := segmentID2Info[segmentID]; ok {
|
||||
segmentLoadInfo := lbt.broker.generateSegmentLoadInfo(ctx, collectionID, partitionID, segmentBingLog, true, schema)
|
||||
|
||||
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
|
||||
msgBase.MsgType = commonpb.MsgType_LoadSegments
|
||||
for _, replica := range info.ReplicaIds {
|
||||
loadSegmentReq := &querypb.LoadSegmentsRequest{
|
||||
Base: msgBase,
|
||||
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
|
||||
Schema: schema,
|
||||
|
||||
loadSegmentReq := &querypb.LoadSegmentsRequest{
|
||||
Base: msgBase,
|
||||
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
|
||||
Schema: schema,
|
||||
CollectionID: collectionID,
|
||||
|
||||
LoadMeta: &querypb.LoadMetaInfo{
|
||||
LoadType: collectionInfo.LoadType,
|
||||
CollectionID: collectionID,
|
||||
|
||||
LoadMeta: &querypb.LoadMetaInfo{
|
||||
LoadType: collectionInfo.LoadType,
|
||||
CollectionID: collectionID,
|
||||
PartitionIDs: toRecoverPartitionIDs,
|
||||
},
|
||||
ReplicaID: replica,
|
||||
}
|
||||
|
||||
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
|
||||
PartitionIDs: toRecoverPartitionIDs,
|
||||
},
|
||||
ReplicaID: replica.ReplicaID,
|
||||
}
|
||||
|
||||
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -659,6 +659,8 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
// if triggerCondition == NodeDown, loadSegment and watchDmchannel request will keep reschedule until the success
|
||||
// the node info has been deleted after assgining child task to triggerTask
|
||||
// so it is necessary to update the meta of segment and dmchannel, or some data may be lost in meta
|
||||
|
||||
// triggerTask may be LoadCollection, LoadPartitions, LoadBalance
|
||||
if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success || triggerTask.getTriggerCondition() == querypb.TriggerCondition_NodeDown {
|
||||
err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta)
|
||||
if err != nil {
|
||||
|
@ -919,8 +921,8 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
|
|||
collectionID := loadInfo.CollectionID
|
||||
segmentID := loadInfo.SegmentID
|
||||
|
||||
segment, ok := segments[segmentID]
|
||||
if !ok {
|
||||
segment, saved := segments[segmentID]
|
||||
if !saved {
|
||||
segment = &querypb.SegmentInfo{
|
||||
SegmentID: segmentID,
|
||||
CollectionID: loadInfo.CollectionID,
|
||||
|
@ -941,7 +943,10 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
|
|||
if _, ok := segmentInfosToSave[collectionID]; !ok {
|
||||
segmentInfosToSave[collectionID] = make([]*querypb.SegmentInfo, 0)
|
||||
}
|
||||
segmentInfosToSave[collectionID] = append(segmentInfosToSave[collectionID], segment)
|
||||
|
||||
if !saved {
|
||||
segmentInfosToSave[collectionID] = append(segmentInfosToSave[collectionID], segment)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue