enhance: Enable score based balance channel policy (#38143)

issue: #38142
current balance channel policy only consider current collection's
distribution, so if all collections has 1 channel, and all channels has
been loaded on same querynode, after querynode num increase, balance
channel won't be triggered.

This PR enable score based balance channel policy, to achieve:
1. distribute all channels evenly across multiple querynodes
2. distribute each collection's channel evenly across multiple
querynodes.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/38080/head
wei liu 2024-12-11 17:20:43 +08:00 committed by GitHub
parent 0d7a89a4f8
commit e279ccf109
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 545 additions and 40 deletions

View File

@ -347,6 +347,9 @@ queryCoord:
rowCountFactor: 0.4 # the row count weight used when balancing segments among queryNodes
segmentCountFactor: 0.4 # the segment count weight used when balancing segments among queryNodes
globalSegmentCountFactor: 0.1 # the segment count weight used when balancing segments among queryNodes
# the channel count weight used when balancing channels among queryNodes,
# A higher value reduces the likelihood of assigning channels from the same collection to the same QueryNode. Set to 1 to disable this feature.
collectionChannelCountFactor: 10
segmentCountMaxSteps: 50 # segment count based plan generator max steps
rowCountMaxSteps: 50 # segment count based plan generator max steps
randomMaxSteps: 10 # segment count based plan generator max steps

View File

@ -59,7 +59,7 @@ func (chanPlan *ChannelAssignPlan) String() string {
type Balance interface {
AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan
AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan)
}
@ -104,7 +104,7 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int
return ret
}
func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")

View File

@ -177,7 +177,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(ctx, c.assignments, c.nodeIDs, false)
plans := suite.roundRobinBalancer.AssignChannel(ctx, 1, c.assignments, c.nodeIDs, false)
suite.ElementsMatch(c.expectPlans, plans)
})
}

View File

@ -147,7 +147,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(ctx context.Context,
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range offlineNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID), meta.WithChannelName2Channel(channelName))
plans := b.AssignChannel(ctx, dmChannels, onlineNodes, false)
plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
@ -176,7 +176,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(ctx context.Context,
func (b *ChannelLevelScoreBalancer) genSegmentPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes)
nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
@ -262,7 +262,7 @@ func (b *ChannelLevelScoreBalancer) genChannelPlan(ctx context.Context, replica
return nil
}
channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica

View File

@ -22,17 +22,17 @@ func (_m *MockBalancer) EXPECT() *MockBalancer_Expecter {
return &MockBalancer_Expecter{mock: &_m.Mock}
}
// AssignChannel provides a mock function with given fields: ctx, channels, nodes, manualBalance
func (_m *MockBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
ret := _m.Called(ctx, channels, nodes, manualBalance)
// AssignChannel provides a mock function with given fields: ctx, collectionID, channels, nodes, manualBalance
func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
ret := _m.Called(ctx, collectionID, channels, nodes, manualBalance)
if len(ret) == 0 {
panic("no return value specified for AssignChannel")
}
var r0 []ChannelAssignPlan
if rf, ok := ret.Get(0).(func(context.Context, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan); ok {
r0 = rf(ctx, channels, nodes, manualBalance)
if rf, ok := ret.Get(0).(func(context.Context, int64, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan); ok {
r0 = rf(ctx, collectionID, channels, nodes, manualBalance)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]ChannelAssignPlan)
@ -49,16 +49,17 @@ type MockBalancer_AssignChannel_Call struct {
// AssignChannel is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - channels []*meta.DmChannel
// - nodes []int64
// - manualBalance bool
func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, channels interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignChannel_Call {
return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, channels, nodes, manualBalance)}
func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, collectionID interface{}, channels interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignChannel_Call {
return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, collectionID, channels, nodes, manualBalance)}
}
func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool)) *MockBalancer_AssignChannel_Call {
func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool)) *MockBalancer_AssignChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]*meta.DmChannel), args[2].([]int64), args[3].(bool))
run(args[0].(context.Context), args[1].(int64), args[2].([]*meta.DmChannel), args[3].([]int64), args[4].(bool))
})
return _c
}
@ -68,7 +69,7 @@ func (_c *MockBalancer_AssignChannel_Call) Return(_a0 []ChannelAssignPlan) *Mock
return _c
}
func (_c *MockBalancer_AssignChannel_Call) RunAndReturn(run func(context.Context, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan) *MockBalancer_AssignChannel_Call {
func (_c *MockBalancer_AssignChannel_Call) RunAndReturn(run func(context.Context, int64, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan) *MockBalancer_AssignChannel_Call {
_c.Call.Return(run)
return _c
}

View File

@ -87,7 +87,7 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID
// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
// try to make every query node has channel count
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
@ -311,7 +311,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(ctx context.Context, repl
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range roNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
plans := b.AssignChannel(ctx, dmChannels, rwNodes, false)
plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, rwNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
@ -349,7 +349,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(ctx context.Context, br *balanceR
return nil
}
channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica

View File

@ -70,7 +70,7 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
}
// calculate each node's score
nodeItemsMap := b.convertToNodeItems(br, collectionID, nodes)
nodeItemsMap := b.convertToNodeItemsBySegment(br, collectionID, nodes)
if len(nodeItemsMap) == 0 {
return nil
}
@ -93,6 +93,9 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
})
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
if manualBalance {
balanceBatchSize = math.MaxInt64
}
plans := make([]SegmentAssignPlan, 0, len(segments))
for _, s := range segments {
func(s *meta.Segment) {
@ -143,6 +146,90 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
return plans
}
func (b *ScoreBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
br := NewBalanceReport()
return b.assignChannel(br, collectionID, channels, nodes, manualBalance)
}
func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
normalNode := info != nil && info.GetState() == session.NodeStateNormal
if !normalNode {
br.AddRecord(StrRecord(fmt.Sprintf("non-manual balance, skip abnormal node: %d", node)))
}
return normalNode
})
}
// calculate each node's score
nodeItemsMap := b.convertToNodeItemsByChannel(br, collectionID, nodes)
if len(nodeItemsMap) == 0 {
return nil
}
queue := newPriorityQueue()
for _, item := range nodeItemsMap {
queue.push(item)
}
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.GetAsInt()
if manualBalance {
balanceBatchSize = math.MaxInt64
}
plans := make([]ChannelAssignPlan, 0, len(channels))
for _, ch := range channels {
func(ch *meta.DmChannel) {
// for each channel, pick the node with the least score
targetNode := queue.pop().(*nodeItem)
// make sure candidate is always push back
defer queue.push(targetNode)
scoreChanges := b.calculateChannelScore(ch, collectionID)
sourceNode := nodeItemsMap[ch.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for manual balance
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
br.AddRecord(StrRecordf("skip generate balance plan for channel %s since no enough benefit", ch.GetChannelName()))
return
}
from := int64(-1)
// fromScore := int64(0)
if sourceNode != nil {
from = sourceNode.nodeID
// fromScore = int64(sourceNode.getPriority())
}
plan := ChannelAssignPlan{
From: from,
To: targetNode.nodeID,
Channel: ch,
// FromScore: fromScore,
// ToScore: int64(targetNode.getPriority()),
// SegmentScore: int64(scoreChanges),
}
br.AddRecord(StrRecordf("add segment plan %s", plan))
plans = append(plans, plan)
// update the sourceNode and targetNode's score
if sourceNode != nil {
sourceNode.AddCurrentScoreDelta(-scoreChanges)
}
targetNode.AddCurrentScoreDelta(scoreChanges)
}(ch)
if len(plans) > balanceBatchSize {
break
}
}
return plans
}
func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, scoreChanges float64) bool {
// if the score diff between sourceNode and targetNode is lower than the unbalance toleration factor, there is no need to assign it targetNode
oldPriorityDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority()))
@ -165,14 +252,14 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *
return true
}
func (b *ScoreBasedBalancer) convertToNodeItems(br *balanceReport, collectionID int64, nodeIDs []int64) map[int64]*nodeItem {
func (b *ScoreBasedBalancer) convertToNodeItemsBySegment(br *balanceReport, collectionID int64, nodeIDs []int64) map[int64]*nodeItem {
totalScore := 0
nodeScoreMap := make(map[int64]*nodeItem)
nodeMemMap := make(map[int64]float64)
totalMemCapacity := float64(0)
allNodeHasMemInfo := true
for _, node := range nodeIDs {
score := b.calculateScore(br, collectionID, node)
score := b.calculateScoreBySegment(br, collectionID, node)
nodeItem := newNodeItem(score, node)
nodeScoreMap[node] = &nodeItem
totalScore += score
@ -219,7 +306,53 @@ func (b *ScoreBasedBalancer) convertToNodeItems(br *balanceReport, collectionID
return nodeScoreMap
}
func (b *ScoreBasedBalancer) calculateScore(br *balanceReport, collectionID, nodeID int64) int {
func (b *ScoreBasedBalancer) convertToNodeItemsByChannel(br *balanceReport, collectionID int64, nodeIDs []int64) map[int64]*nodeItem {
totalScore := 0
nodeScoreMap := make(map[int64]*nodeItem)
nodeMemMap := make(map[int64]float64)
totalMemCapacity := float64(0)
allNodeHasMemInfo := true
for _, node := range nodeIDs {
score := b.calculateScoreByChannel(br, collectionID, node)
nodeItem := newNodeItem(score, node)
nodeScoreMap[node] = &nodeItem
totalScore += score
br.AddNodeItem(nodeScoreMap[node])
// set memory default to 1.0, will multiply average value to compute assigned score
nodeInfo := b.nodeManager.Get(node)
if nodeInfo != nil {
totalMemCapacity += nodeInfo.MemCapacity()
nodeMemMap[node] = nodeInfo.MemCapacity()
}
allNodeHasMemInfo = allNodeHasMemInfo && nodeInfo != nil && nodeInfo.MemCapacity() > 0
}
if totalScore == 0 {
return nodeScoreMap
}
// if all node has memory info, we will use totalScore / totalMemCapacity to calculate the score, then average means average score on memory unit
// otherwise, we will use totalScore / len(nodeItemsMap) to calculate the score, then average means average score on node unit
average := float64(0)
if allNodeHasMemInfo {
average = float64(totalScore) / totalMemCapacity
} else {
average = float64(totalScore) / float64(len(nodeIDs))
}
for _, node := range nodeIDs {
if allNodeHasMemInfo {
nodeScoreMap[node].setAssignedScore(nodeMemMap[node] * average)
br.SetMemoryFactor(node, nodeMemMap[node])
} else {
nodeScoreMap[node].setAssignedScore(average)
}
}
return nodeScoreMap
}
func (b *ScoreBasedBalancer) calculateScoreBySegment(br *balanceReport, collectionID, nodeID int64) int {
nodeRowCount := 0
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID))
@ -259,11 +392,45 @@ func (b *ScoreBasedBalancer) calculateScore(br *balanceReport, collectionID, nod
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}
func (b *ScoreBasedBalancer) calculateScoreByChannel(br *balanceReport, collectionID, nodeID int64) int {
// calculate global sealed segment row count
channels := b.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(nodeID))
nodeChannelNum, collectionChannelNum := 0, 0
for _, ch := range channels {
if ch.GetCollectionID() == collectionID {
collectionChannelNum += 1
} else {
nodeChannelNum += int(b.calculateChannelScore(ch, -1))
}
}
// calculate executing task cost in scheduler
nodeChannelNum += b.scheduler.GetChannelTaskDelta(nodeID, -1)
collectionChannelNum += b.scheduler.GetChannelTaskDelta(nodeID, collectionID)
br.AddDetailRecord(StrRecordf("Calcalute score for collection %d on node %d, global row count: %d, collection row count: %d",
collectionID, nodeID, nodeChannelNum, collectionChannelNum))
// give a higher weight to distribute collection's channels evenly across multiple nodes.
channelWeight := paramtable.Get().QueryCoordCfg.CollectionChannelCountFactor.GetAsFloat()
return nodeChannelNum + int(float64(collectionChannelNum)*math.Max(1.0, channelWeight))
}
// calculateSegmentScore calculate the score which the segment represented
func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) float64 {
return float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}
func (b *ScoreBasedBalancer) calculateChannelScore(ch *meta.DmChannel, currentCollection int64) float64 {
if ch.GetCollectionID() == currentCollection {
// give a higher weight to distribute current collection's channels evenly across multiple nodes.
channelWeight := paramtable.Get().QueryCoordCfg.CollectionChannelCountFactor.GetAsFloat()
return math.Max(1.0, channelWeight)
}
return 1
}
func (b *ScoreBasedBalancer) BalanceReplica(ctx context.Context, replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) {
log := log.With(
zap.Int64("collection", replica.GetCollectionID()),
@ -346,12 +513,12 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica
func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes)
nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
log.Info("node workload status",
log.Info("node segment workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))
@ -365,8 +532,6 @@ func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceRepo
segmentDist[node] = segments
}
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
// find the segment from the node which has more score than the average
segmentsToMove := make([]*meta.Segment, 0)
for node, segments := range segmentDist {
@ -384,11 +549,6 @@ func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceRepo
segmentScore := b.calculateSegmentScore(s)
br.AddRecord(StrRecordf("pick segment %d with score %f from node %d", s.ID, segmentScore, node))
segmentsToMove = append(segmentsToMove, s)
if len(segmentsToMove) >= balanceBatchSize {
br.AddRecord(StrRecordf("stop add segment candidate since current plan is equal to batch max(%d)", balanceBatchSize))
break
}
currentScore -= segmentScore
if currentScore <= assignedScore {
br.AddRecord(StrRecordf("stop add segment candidate since node[%d] current score(%f) below assigned(%f)", node, currentScore, assignedScore))
@ -419,3 +579,70 @@ func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceRepo
return segmentPlans
}
func (b *ScoreBasedBalancer) genChannelPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, onlineNodes []int64) []ChannelAssignPlan {
nodeItemsMap := b.convertToNodeItemsByChannel(br, replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
log.Info("node channel workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))
channelDist := make(map[int64][]*meta.DmChannel)
for _, node := range onlineNodes {
channelDist[node] = b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))
}
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
// find the segment from the node which has more score than the average
channelsToMove := make([]*meta.DmChannel, 0)
for node, channels := range channelDist {
currentScore := nodeItemsMap[node].getCurrentScore()
assignedScore := nodeItemsMap[node].getAssignedScore()
if currentScore <= assignedScore {
br.AddRecord(StrRecordf("node %d skip balance since current score(%f) lower than assigned one (%f)", node, currentScore, assignedScore))
continue
}
for _, ch := range channels {
channelScore := b.calculateChannelScore(ch, replica.GetCollectionID())
br.AddRecord(StrRecordf("pick channel %s with score %f from node %d", ch.GetChannelName(), channelScore, node))
channelsToMove = append(channelsToMove, ch)
if len(channelsToMove) >= balanceBatchSize {
br.AddRecord(StrRecordf("stop add channel candidate since current plan is equal to batch max(%d)", balanceBatchSize))
break
}
currentScore -= channelScore
if currentScore <= assignedScore {
br.AddRecord(StrRecordf("stop add channel candidate since node[%d] current score(%f) below assigned(%f)", node, currentScore, assignedScore))
break
}
}
}
// if the channel are redundant, skip it's balance for now
channelsToMove = lo.Filter(channelsToMove, func(ch *meta.DmChannel, _ int) bool {
times := len(b.dist.ChannelDistManager.GetByFilter(meta.WithReplica2Channel(replica), meta.WithChannelName2Channel(ch.GetChannelName())))
channelUnique := times == 1
if !channelUnique {
br.AddRecord(StrRecordf("abort balancing channel %s since it appear multiple times(%d) in distribution", ch.GetChannelName(), times))
}
return channelUnique
})
if len(channelsToMove) == 0 {
return nil
}
channelPlans := b.assignChannel(br, replica.GetCollectionID(), channelsToMove, onlineNodes, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
}
return channelPlans
}

View File

@ -17,6 +17,7 @@ package balance
import (
"context"
"fmt"
"testing"
"github.com/samber/lo"
@ -608,9 +609,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
// set node delta count
suite.mockScheduler.ExpectedCalls = nil
for i, node := range c.nodes {
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe()
for i := range c.nodes {
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodes[i], mock.Anything).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodes[i], mock.Anything).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
}
@ -1221,3 +1222,251 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceSegmentAndChannel() {
_, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID)
suite.Equal(len(channelPlans), 0)
}
func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnMultiCollections() {
ctx := context.Background()
balancer := suite.balancer
// mock 10 collections with each collection has 1 channel
collectionNum := 10
channelNum := 1
for i := 1; i <= collectionNum; i++ {
collectionID := int64(i)
collection := utils.CreateTestCollection(collectionID, int32(1))
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(ctx, collection)
balancer.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, collectionID))
balancer.meta.ReplicaManager.Spawn(ctx, collectionID, map[string]int{meta.DefaultResourceGroupName: 1}, nil)
channels := make([]*datapb.VchannelInfo, channelNum)
for i := 0; i < channelNum; i++ {
channels[i] = &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(
channels, nil, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe()
balancer.targetMgr.UpdateCollectionNextTarget(ctx, collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID)
}
// mock querynode-1 to node manager
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
Version: common.Version,
})
nodeInfo.SetState(session.NodeStateNormal)
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 1)
utils.RecoverAllCollection(balancer.meta)
// mock channel distribution
channelDist := make([]*meta.DmChannel, 0)
for i := 1; i <= collectionNum; i++ {
collectionID := int64(i)
for i := 0; i < channelNum; i++ {
channelDist = append(channelDist, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1,
})
}
}
balancer.dist.ChannelDistManager.Update(1, channelDist...)
// assert balance channel won't happens on 1 querynode
ret := make([]ChannelAssignPlan, 0)
for i := 1; i <= collectionNum; i++ {
collectionID := int64(i)
_, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID)
ret = append(ret, channelPlans...)
}
suite.Len(ret, 0)
// mock querynode-2 to node manager
nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
Version: common.Version,
})
suite.balancer.nodeManager.Add(nodeInfo2)
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 2)
utils.RecoverAllCollection(balancer.meta)
_, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
suite.Len(channelPlans, 1)
// mock 1 channel has been move to querynode-2
balancer.dist.ChannelDistManager.Update(1, channelDist[1:]...)
balancer.dist.ChannelDistManager.Update(2, channelDist[:1]...)
_, channelPlans = suite.getCollectionBalancePlans(balancer, 6)
suite.Len(channelPlans, 1)
// mock 5 channel has been move to querynode-2
balancer.dist.ChannelDistManager.Update(1, channelDist[5:]...)
balancer.dist.ChannelDistManager.Update(2, channelDist[:5]...)
_, channelPlans = suite.getCollectionBalancePlans(balancer, 6)
suite.Len(channelPlans, 0)
}
func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnDifferentQN() {
ctx := context.Background()
balancer := suite.balancer
// mock 10 collections with each collection has 1 channel
channelNum := 5
collectionID := int64(1)
collection := utils.CreateTestCollection(collectionID, int32(1))
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(ctx, collection)
balancer.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, collectionID))
balancer.meta.ReplicaManager.Spawn(ctx, collectionID, map[string]int{meta.DefaultResourceGroupName: 1}, nil)
channels := make([]*datapb.VchannelInfo, channelNum)
for i := 0; i < channelNum; i++ {
channels[i] = &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(
channels, nil, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe()
balancer.targetMgr.UpdateCollectionNextTarget(ctx, collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID)
// mock querynode-1 to node manager
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
Version: common.Version,
})
nodeInfo.UpdateStats(session.WithMemCapacity(1024))
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 1)
utils.RecoverAllCollection(balancer.meta)
// mock channel distribution
channelDist := make([]*meta.DmChannel, 0)
for i := 0; i < channelNum; i++ {
channelDist = append(channelDist, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1,
})
}
balancer.dist.ChannelDistManager.Update(1, channelDist...)
// assert balance channel won't happens on 1 querynode
_, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID)
suite.Len(channelPlans, 0)
// mock querynode-2 to node manager
nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
Version: common.Version,
})
suite.balancer.nodeManager.Add(nodeInfo2)
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 2)
utils.RecoverAllCollection(balancer.meta)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.Key, "10")
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.Key)
// test balance channel on same query node
_, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID)
suite.Len(channelPlans, 2)
// test balance on different query node
nodeInfo2.UpdateStats(session.WithMemCapacity(4096))
_, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID)
suite.Len(channelPlans, 4)
}
func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnChannelExclusive() {
ctx := context.Background()
balancer := suite.balancer
collectionNum := 3
channelNum := 3
nodeNum := 3
// mock 10 collections with each collection has 1 channel
for i := 1; i <= collectionNum; i++ {
collectionID := int64(i)
collection := utils.CreateTestCollection(collectionID, int32(1))
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(ctx, collection)
balancer.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, collectionID))
balancer.meta.ReplicaManager.Spawn(ctx, collectionID, map[string]int{meta.DefaultResourceGroupName: 1}, nil)
channels := make([]*datapb.VchannelInfo, channelNum)
for i := 0; i < channelNum; i++ {
channels[i] = &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(
channels, nil, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe()
balancer.targetMgr.UpdateCollectionNextTarget(ctx, collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID)
}
// mock querynode to node manager
for i := 1; i <= nodeNum; i++ {
nodeID := int64(i)
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID,
Address: "127.0.0.1:0",
Hostname: "localhost",
Version: common.Version,
})
nodeInfo.SetState(session.NodeStateNormal)
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, nodeID)
}
utils.RecoverAllCollection(balancer.meta)
// mock channels on collection-a to node 1
collectionID := int64(1)
channelDist1 := make([]*meta.DmChannel, 0)
for i := 0; i < channelNum; i++ {
channelDist1 = append(channelDist1, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1,
})
}
balancer.dist.ChannelDistManager.Update(1, channelDist1...)
collectionID = int64(2)
channelDist2 := make([]*meta.DmChannel, 0)
for i := 0; i < channelNum; i++ {
channelDist2 = append(channelDist2, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1,
})
}
balancer.dist.ChannelDistManager.Update(2, channelDist2...)
collectionID = int64(3)
channelDist3 := make([]*meta.DmChannel, 0)
for i := 0; i < channelNum; i++ {
channelDist3 = append(channelDist3, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1,
})
}
balancer.dist.ChannelDistManager.Update(3, channelDist3...)
// test balance on collection 1
_, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
suite.Len(channelPlans, 2)
// mock collection 1 has balanced
balancer.dist.ChannelDistManager.Update(1, channelDist1[0])
balancer.dist.ChannelDistManager.Update(2, channelDist1[1], channelDist2[0], channelDist2[1], channelDist2[2])
balancer.dist.ChannelDistManager.Update(3, channelDist1[2], channelDist3[0], channelDist3[1], channelDist3[2])
_, channelPlans = suite.getCollectionBalancePlans(balancer, 1)
suite.Len(channelPlans, 0)
_, channelPlans = suite.getCollectionBalancePlans(balancer, 2)
suite.Len(channelPlans, 2)
_, channelPlans = suite.getCollectionBalancePlans(balancer, 3)
suite.Len(channelPlans, 2)
}

View File

@ -232,7 +232,7 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*
if len(rwNodes) == 0 {
rwNodes = replica.GetRWNodes()
}
plan := c.getBalancerFunc().AssignChannel(ctx, []*meta.DmChannel{ch}, rwNodes, false)
plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, false)
plans = append(plans, plan...)
}

View File

@ -100,7 +100,7 @@ func (suite *ChannelCheckerTestSuite) setNodeAvailable(nodes ...int64) {
func (suite *ChannelCheckerTestSuite) createMockBalancer() balance.Balance {
balancer := balance.NewMockBalancer(suite.T())
balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe().Return(func(ctx context.Context, channels []*meta.DmChannel, nodes []int64, _ bool) []balance.ChannelAssignPlan {
balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe().Return(func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, _ bool) []balance.ChannelAssignPlan {
plans := make([]balance.ChannelAssignPlan, 0, len(channels))
for i, c := range channels {
plan := balance.ChannelAssignPlan{

View File

@ -140,7 +140,7 @@ func (suite *CheckerControllerSuite) TestBasic() {
assignSegCounter.Inc()
return nil
})
suite.balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dc []*meta.DmChannel, i []int64, _ bool) []balance.ChannelAssignPlan {
suite.balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64, dc []*meta.DmChannel, i []int64, _ bool) []balance.ChannelAssignPlan {
assingChanCounter.Inc()
return nil
})

View File

@ -183,7 +183,7 @@ func (s *Server) balanceChannels(ctx context.Context,
) error {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
plans := s.getBalancerFunc().AssignChannel(ctx, channels, dstNodes, true)
plans := s.getBalancerFunc().AssignChannel(ctx, collectionID, channels, dstNodes, true)
for i := range plans {
plans[i].From = srcNode
plans[i].Replica = replica

View File

@ -82,7 +82,7 @@ func RecoverReplicaOfCollection(ctx context.Context, m *meta.Meta, collectionID
logger := log.With(zap.Int64("collectionID", collectionID))
rgNames := m.ReplicaManager.GetResourceGroupByCollection(ctx, collectionID)
if rgNames.Len() == 0 {
logger.Error("no resource group found for collection", zap.Int64("collectionID", collectionID))
logger.Error("no resource group found for collection")
return
}
rgs, err := m.ResourceManager.GetNodesOfMultiRG(ctx, rgNames.Collect())

View File

@ -1780,6 +1780,7 @@ type queryCoordConfig struct {
RowCountFactor ParamItem `refreshable:"true"`
SegmentCountFactor ParamItem `refreshable:"true"`
GlobalSegmentCountFactor ParamItem `refreshable:"true"`
CollectionChannelCountFactor ParamItem `refreshable:"true"`
SegmentCountMaxSteps ParamItem `refreshable:"true"`
RowCountMaxSteps ParamItem `refreshable:"true"`
RandomMaxSteps ParamItem `refreshable:"true"`
@ -1827,6 +1828,7 @@ type queryCoordConfig struct {
CollectionObserverInterval ParamItem `refreshable:"false"`
CheckExecutedFlagInterval ParamItem `refreshable:"false"`
CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"`
CollectionBalanceChannelBatchSize ParamItem `refreshable:"true"`
UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"`
ClusterLevelLoadReplicaNumber ParamItem `refreshable:"true"`
ClusterLevelLoadResourceGroups ParamItem `refreshable:"true"`
@ -1945,6 +1947,17 @@ If this parameter is set false, Milvus simply searches the growing segments with
}
p.GlobalSegmentCountFactor.Init(base.mgr)
p.CollectionChannelCountFactor = ParamItem{
Key: "queryCoord.collectionChannelCountFactor",
Version: "2.4.18",
DefaultValue: "10",
PanicIfEmpty: true,
Doc: `the channel count weight used when balancing channels among queryNodes,
A higher value reduces the likelihood of assigning channels from the same collection to the same QueryNode. Set to 1 to disable this feature.`,
Export: true,
}
p.CollectionChannelCountFactor.Init(base.mgr)
p.SegmentCountMaxSteps = ParamItem{
Key: "queryCoord.segmentCountMaxSteps",
Version: "2.3.0",
@ -2367,6 +2380,15 @@ If this parameter is set false, Milvus simply searches the growing segments with
}
p.CollectionBalanceSegmentBatchSize.Init(base.mgr)
p.CollectionBalanceChannelBatchSize = ParamItem{
Key: "queryCoord.collectionBalanceChannelBatchSize",
Version: "2.4.18",
DefaultValue: "1",
Doc: "the max balance task number for channel at each round",
Export: false,
}
p.CollectionBalanceChannelBatchSize.Init(base.mgr)
p.ClusterLevelLoadReplicaNumber = ParamItem{
Key: "queryCoord.clusterLevelLoadReplicaNumber",
Version: "2.4.7",

View File

@ -369,9 +369,12 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 0.1, Params.DelegatorMemoryOverloadFactor.GetAsFloat())
assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt())
assert.Equal(t, 1, Params.CollectionBalanceChannelBatchSize.GetAsInt())
assert.Equal(t, 0, Params.ClusterLevelLoadReplicaNumber.GetAsInt())
assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0)
assert.Equal(t, 10, Params.CollectionChannelCountFactor.GetAsInt())
})
t.Run("test queryNodeConfig", func(t *testing.T) {