mirror of https://github.com/milvus-io/milvus.git
enhance: Rewrite gen stopping segment plan based on assign segment (#29473)
`AssignSegment` method defines how to assign segment to nodes, but score_based_balance implement another assign logic in `genStoppingSegmentPlan` This PR rewrite gen stopping segment plan based on assign segment. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/29485/head
parent
2ffde52f8a
commit
6cbf9c489d
|
@ -194,7 +194,7 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
|
|||
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
|
||||
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
|
||||
if len(channelPlans) == 0 {
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...)
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
|
||||
}
|
||||
} else {
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
|
||||
|
@ -213,46 +213,22 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
|
|||
return segmentPlans, channelPlans
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, nodesSegments map[int64][]*meta.Segment, stoppingNodesSegments map[int64][]*meta.Segment) []SegmentAssignPlan {
|
||||
func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan {
|
||||
segmentPlans := make([]SegmentAssignPlan, 0)
|
||||
// generate candidates
|
||||
nodeItems := b.convertToNodeItems(replica.GetCollectionID(), lo.Keys(nodesSegments))
|
||||
queue := newPriorityQueue()
|
||||
for _, item := range nodeItems {
|
||||
queue.push(item)
|
||||
}
|
||||
|
||||
// collect segment segments to assign
|
||||
var segments []*meta.Segment
|
||||
nodeIndex := make(map[int64]int64)
|
||||
for nodeID, stoppingSegments := range stoppingNodesSegments {
|
||||
for _, segment := range stoppingSegments {
|
||||
segments = append(segments, segment)
|
||||
nodeIndex[segment.GetID()] = nodeID
|
||||
for _, nodeID := range offlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
})
|
||||
plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes)
|
||||
for i := range plans {
|
||||
plans[i].From = nodeID
|
||||
plans[i].ReplicaID = replica.ID
|
||||
}
|
||||
segmentPlans = append(segmentPlans, plans...)
|
||||
}
|
||||
|
||||
sort.Slice(segments, func(i, j int) bool {
|
||||
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
|
||||
})
|
||||
|
||||
for _, s := range segments {
|
||||
// pick the node with the least row count and allocate to it.
|
||||
ni := queue.pop().(*nodeItem)
|
||||
plan := SegmentAssignPlan{
|
||||
ReplicaID: replica.GetID(),
|
||||
From: nodeIndex[s.GetID()],
|
||||
To: ni.nodeID,
|
||||
Segment: s,
|
||||
}
|
||||
segmentPlans = append(segmentPlans, plan)
|
||||
// change node's priority and push back, should count for both collection factor and local factor
|
||||
p := ni.getPriority()
|
||||
ni.setPriority(p + int(s.GetNumOfRows()) + int(float64(s.GetNumOfRows())*
|
||||
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()))
|
||||
queue.push(ni)
|
||||
}
|
||||
|
||||
return segmentPlans
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue