From ad37b98cdab681a846085edab58e72af699c8349 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 27 Dec 2023 15:50:47 +0800 Subject: [PATCH] enhance: Rewrite gen stopping segment plan based on assign segment (29473) (#29480) pr: #29473 `AssignSegment` method defines how to assign segment to nodes, but score_based_balance implement another assign logic in `genStoppingSegmentPlan` This PR rewrite gen stopping segment plan based on assign segment. Signed-off-by: Wei Liu --- .../balance/score_based_balancer.go | 51 +++++-------------- .../balance/score_based_balancer_test.go | 1 + 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index c41427d3df..30eec67045 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -191,7 +191,7 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) if len(channelPlans) == 0 { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...) + segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) } } else { if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { @@ -210,46 +210,21 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss return segmentPlans, channelPlans } -func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, nodesSegments map[int64][]*meta.Segment, stoppingNodesSegments map[int64][]*meta.Segment) []SegmentAssignPlan { +func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan { segmentPlans := make([]SegmentAssignPlan, 0) - // generate candidates - nodeItems := b.convertToNodeItems(replica.GetCollectionID(), lo.Keys(nodesSegments)) - queue := newPriorityQueue() - for _, item := range nodeItems { - queue.push(item) - } - - // collect segment segments to assign - var segments []*meta.Segment - nodeIndex := make(map[int64]int64) - for nodeID, stoppingSegments := range stoppingNodesSegments { - for _, segment := range stoppingSegments { - segments = append(segments, segment) - nodeIndex[segment.GetID()] = nodeID + for _, nodeID := range offlineNodes { + dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) + segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool { + return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && + b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil + }) + plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes) + for i := range plans { + plans[i].From = nodeID + plans[i].ReplicaID = replica.ID } + segmentPlans = append(segmentPlans, plans...) } - - sort.Slice(segments, func(i, j int) bool { - return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() - }) - - for _, s := range segments { - // pick the node with the least row count and allocate to it. - ni := queue.pop().(*nodeItem) - plan := SegmentAssignPlan{ - ReplicaID: replica.GetID(), - From: nodeIndex[s.GetID()], - To: ni.nodeID, - Segment: s, - } - segmentPlans = append(segmentPlans, plan) - // change node's priority and push back, should count for both collection factor and local factor - p := ni.getPriority() - ni.setPriority(p + int(s.GetNumOfRows()) + int(float64(s.GetNumOfRows())* - params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())) - queue.push(ni) - } - return segmentPlans } diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 38efd181cf..9839d89338 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -604,6 +604,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) // 2. set up target for distribution for multi collections for node, s := range c.distributions {