mirror of https://github.com/milvus-io/milvus.git
fix: [10kcp] channel unbalance during stopping balance progress (#38972)
issue: https://github.com/milvus-io/milvus/issues/38970, https://github.com/milvus-io/milvus/issues/37630 cause the stopping balance channel still use the row_count_based policy, which may causes channel unbalance in multi-collection case. This PR impl a score based stopping balance channel policy. pr: https://github.com/milvus-io/milvus/pull/38971 Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Co-authored-by: Wei Liu <wei.liu@zilliz.com>pull/38974/head
parent
9b2b2a2689
commit
f5f4fed889
|
@ -45,10 +45,13 @@ func (segPlan *SegmentAssignPlan) String() string {
|
|||
}
|
||||
|
||||
type ChannelAssignPlan struct {
|
||||
Channel *meta.DmChannel
|
||||
Replica *meta.Replica
|
||||
From int64
|
||||
To int64
|
||||
Channel *meta.DmChannel
|
||||
Replica *meta.Replica
|
||||
From int64
|
||||
To int64
|
||||
FromScore int64
|
||||
ToScore int64
|
||||
ChannelScore int64
|
||||
}
|
||||
|
||||
func (chanPlan *ChannelAssignPlan) String() string {
|
||||
|
|
|
@ -391,7 +391,7 @@ func newNodeItem(currentScore int, nodeID int64) nodeItem {
|
|||
|
||||
func (b *nodeItem) getPriority() int {
|
||||
// if node lacks more score between assignedScore and currentScore, then higher priority
|
||||
return int(b.currentScore - b.assignedScore)
|
||||
return int(math.Ceil(b.currentScore - b.assignedScore))
|
||||
}
|
||||
|
||||
func (b *nodeItem) setPriority(priority int) {
|
||||
|
|
|
@ -192,19 +192,19 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
|
|||
}
|
||||
|
||||
from := int64(-1)
|
||||
// fromScore := int64(0)
|
||||
fromScore := int64(0)
|
||||
if sourceNode != nil {
|
||||
from = sourceNode.nodeID
|
||||
// fromScore = int64(sourceNode.getPriority())
|
||||
fromScore = int64(sourceNode.getPriority())
|
||||
}
|
||||
|
||||
plan := ChannelAssignPlan{
|
||||
From: from,
|
||||
To: targetNode.nodeID,
|
||||
Channel: ch,
|
||||
// FromScore: fromScore,
|
||||
// ToScore: int64(targetNode.getPriority()),
|
||||
// SegmentScore: int64(scoreChanges),
|
||||
From: from,
|
||||
To: targetNode.nodeID,
|
||||
Channel: ch,
|
||||
FromScore: fromScore,
|
||||
ToScore: int64(targetNode.getPriority()),
|
||||
ChannelScore: int64(scoreChanges),
|
||||
}
|
||||
br.AddRecord(StrRecordf("add segment plan %s", plan))
|
||||
plans = append(plans, plan)
|
||||
|
@ -486,6 +486,20 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans
|
|||
return segmentPlans, channelPlans
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, rwNodes []int64, roNodes []int64) []ChannelAssignPlan {
|
||||
channelPlans := make([]ChannelAssignPlan, 0)
|
||||
for _, nodeID := range roNodes {
|
||||
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
|
||||
plans := b.AssignChannel(replica.GetCollectionID(), dmChannels, rwNodes, false)
|
||||
for i := range plans {
|
||||
plans[i].From = nodeID
|
||||
plans[i].Replica = replica
|
||||
}
|
||||
channelPlans = append(channelPlans, plans...)
|
||||
}
|
||||
return channelPlans
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan {
|
||||
segmentPlans := make([]SegmentAssignPlan, 0)
|
||||
for _, nodeID := range offlineNodes {
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
||||
|
|
Loading…
Reference in New Issue