mirror of https://github.com/milvus-io/milvus.git
enhance: Rewrite gen stopping segment plan based on assign segment (29473) (#29480)
pr: #29473 `AssignSegment` method defines how to assign segment to nodes, but score_based_balance implement another assign logic in `genStoppingSegmentPlan` This PR rewrite gen stopping segment plan based on assign segment. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/29386/head
parent
d0bcbf3953
commit
ad37b98cda
|
@ -191,7 +191,7 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
|
|||
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
|
||||
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
|
||||
if len(channelPlans) == 0 {
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...)
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
|
||||
}
|
||||
} else {
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
|
||||
|
@ -210,46 +210,21 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
|
|||
return segmentPlans, channelPlans
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, nodesSegments map[int64][]*meta.Segment, stoppingNodesSegments map[int64][]*meta.Segment) []SegmentAssignPlan {
|
||||
func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan {
|
||||
segmentPlans := make([]SegmentAssignPlan, 0)
|
||||
// generate candidates
|
||||
nodeItems := b.convertToNodeItems(replica.GetCollectionID(), lo.Keys(nodesSegments))
|
||||
queue := newPriorityQueue()
|
||||
for _, item := range nodeItems {
|
||||
queue.push(item)
|
||||
}
|
||||
|
||||
// collect segment segments to assign
|
||||
var segments []*meta.Segment
|
||||
nodeIndex := make(map[int64]int64)
|
||||
for nodeID, stoppingSegments := range stoppingNodesSegments {
|
||||
for _, segment := range stoppingSegments {
|
||||
segments = append(segments, segment)
|
||||
nodeIndex[segment.GetID()] = nodeID
|
||||
for _, nodeID := range offlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
|
||||
})
|
||||
plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes)
|
||||
for i := range plans {
|
||||
plans[i].From = nodeID
|
||||
plans[i].ReplicaID = replica.ID
|
||||
}
|
||||
segmentPlans = append(segmentPlans, plans...)
|
||||
}
|
||||
|
||||
sort.Slice(segments, func(i, j int) bool {
|
||||
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
|
||||
})
|
||||
|
||||
for _, s := range segments {
|
||||
// pick the node with the least row count and allocate to it.
|
||||
ni := queue.pop().(*nodeItem)
|
||||
plan := SegmentAssignPlan{
|
||||
ReplicaID: replica.GetID(),
|
||||
From: nodeIndex[s.GetID()],
|
||||
To: ni.nodeID,
|
||||
Segment: s,
|
||||
}
|
||||
segmentPlans = append(segmentPlans, plan)
|
||||
// change node's priority and push back, should count for both collection factor and local factor
|
||||
p := ni.getPriority()
|
||||
ni.setPriority(p + int(s.GetNumOfRows()) + int(float64(s.GetNumOfRows())*
|
||||
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()))
|
||||
queue.push(ni)
|
||||
}
|
||||
|
||||
return segmentPlans
|
||||
}
|
||||
|
||||
|
|
|
@ -604,6 +604,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
|
|||
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
|
||||
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
|
||||
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
|
||||
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
|
||||
|
||||
// 2. set up target for distribution for multi collections
|
||||
for node, s := range c.distributions {
|
||||
|
|
Loading…
Reference in New Issue