From 30a99b66c1343bffe9e7b33d4fcc46087c3843e6 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 9 Sep 2024 10:21:06 +0800 Subject: [PATCH] fix: Fix logic dead lock when delegator has high memory usage (#36065) issue: #36064 when delegator has high memory usage, load l0 segment will failed. and balance segment task will blocked by load segment task, then delegator cann't free memory by moving out some segment, causes a logic dead lock. this PR remove the limit for balance, we permit segment and balance execute in parallel. which won't cause side effect due to: 1. one segment can only has one task in qc's scheduler, and load/release task will replace balance task if necessary 2. balance speed has been limited, and it won't block load segment task. 3. if collection has load task and balance task at same time, load task will be scheduled first due to high proirity. --------- Signed-off-by: Wei Liu --- internal/querycoordv2/balance/balance.go | 15 +++++++++------ .../balance/score_based_balancer.go | 18 ++++++++++++++---- .../querycoordv2/checkers/balance_checker.go | 5 ----- .../checkers/balance_checker_test.go | 4 +--- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index 117c895efb..97eb504d3c 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -30,15 +30,18 @@ import ( ) type SegmentAssignPlan struct { - Segment *meta.Segment - Replica *meta.Replica - From int64 // -1 if empty - To int64 + Segment *meta.Segment + Replica *meta.Replica + From int64 // -1 if empty + To int64 + FromScore int64 + ToScore int64 + SegmentScore int64 } func (segPlan *SegmentAssignPlan) ToString() string { - return fmt.Sprintf("SegmentPlan:[collectionID: %d, replicaID: %d, segmentID: %d, from: %d, to: %d]\n", - segPlan.Segment.CollectionID, segPlan.Replica.GetID(), segPlan.Segment.ID, segPlan.From, segPlan.To) + return fmt.Sprintf("SegmentPlan:[collectionID: %d, replicaID: %d, segmentID: %d, from: %d, to: %d, fromScore: %d, toScore: %d, segmentScore: %d]\n", + segPlan.Segment.CollectionID, segPlan.Replica.GetID(), segPlan.Segment.ID, segPlan.From, segPlan.To, segPlan.FromScore, segPlan.ToScore, segPlan.SegmentScore) } type ChannelAssignPlan struct { diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 270ed7f4ca..032dbd0126 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -99,14 +99,24 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. return } + from := int64(-1) + fromScore := int64(0) + if sourceNode != nil { + from = sourceNode.nodeID + fromScore = int64(sourceNode.getPriority()) + } + plan := SegmentAssignPlan{ - From: -1, - To: targetNode.nodeID, - Segment: s, + From: from, + To: targetNode.nodeID, + Segment: s, + FromScore: fromScore, + ToScore: int64(targetNode.getPriority()), + SegmentScore: int64(priorityChange), } plans = append(plans, plan) - // update the targetNode's score + // update the sourceNode and targetNode's score if sourceNode != nil { sourceNode.setPriority(sourceNode.getPriority() - priorityChange) } diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 73100480b0..7acdb898e6 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -119,11 +119,6 @@ func (b *BalanceChecker) replicasToBalance() []int64 { return nil } - // scheduler is handling segment task, skip - if b.scheduler.GetSegmentTaskNum() != 0 { - return nil - } - // iterator one normal collection in one round normalReplicasToBalance := make([]int64, 0) hasUnbalancedCollection := false diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 8f9333d347..744d9a2fc7 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -220,9 +220,7 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() { return 1 }) replicasToBalance := suite.checker.replicasToBalance() - suite.Empty(replicasToBalance) - segPlans, _ := suite.checker.balanceReplicas(replicasToBalance) - suite.Empty(segPlans) + suite.Len(replicasToBalance, 1) } func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {