enhance: Enable balance on querynode with different mem capacity (#36466)

issue: #36464
This PR enable balance on querynode with different mem capacity, for
query node which has more mem capactity will be assigned more records,
and query node with the largest difference between assignedScore and
currentScore will have a higher priority to carry the new segment.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/36626/head
wei liu 2024-09-30 16:15:17 +08:00 committed by GitHub
parent 00a5025949
commit 470bb0cc3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 250 additions and 64 deletions

View File

@ -609,6 +609,7 @@ message GetDataDistributionResponse {
repeated ChannelVersionInfo channels = 4;
repeated LeaderView leader_views = 5;
int64 lastModifyTs = 6;
double memCapacityInMB = 7;
}
message LeaderView {

View File

@ -161,8 +161,15 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica
func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
totalScore := 0
nodeItemsMap := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
log.Info("node workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.String("channelName", channelName),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))
// list all segment which could be balanced, and calculate node's score
for _, node := range onlineNodes {
@ -171,19 +178,14 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
})
segmentDist[node] = segments
totalScore += nodeScore[node].getPriority()
}
if totalScore == 0 {
return nil
}
// find the segment from the node which has more score than the average
segmentsToMove := make([]*meta.Segment, 0)
average := totalScore / len(onlineNodes)
for node, segments := range segmentDist {
leftScore := nodeScore[node].getPriority()
if leftScore <= average {
currentScore := nodeItemsMap[node].getCurrentScore()
assignedScore := nodeItemsMap[node].getAssignedScore()
if currentScore <= assignedScore {
continue
}
@ -192,8 +194,8 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
})
for _, s := range segments {
segmentsToMove = append(segmentsToMove, s)
leftScore -= b.calculateSegmentScore(s)
if leftScore <= average {
currentScore -= b.calculateSegmentScore(s)
if currentScore <= assignedScore {
break
}
}

View File

@ -18,6 +18,7 @@ package balance
import (
"context"
"fmt"
"math"
"sort"
@ -77,9 +78,8 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me
if len(plans) > balanceBatchSize {
break
}
// change node's priority and push back
p := ni.getPriority()
ni.setPriority(p + int(s.GetNumOfRows()))
// change node's score and push back
ni.AddCurrentScoreDelta(float64(s.GetNumOfRows()))
queue.push(ni)
}
return plans
@ -119,9 +119,8 @@ func (b *RowCountBasedBalancer) AssignChannel(channels []*meta.DmChannel, nodes
Channel: c,
}
plans = append(plans, plan)
// change node's priority and push back
p := ni.getPriority()
ni.setPriority(p + 1)
// change node's score and push back
ni.AddCurrentScoreDelta(1)
queue.push(ni)
}
return plans
@ -366,14 +365,51 @@ func NewRowCountBasedBalancer(
type nodeItem struct {
baseItem
nodeID int64
fmt.Stringer
nodeID int64
assignedScore float64
currentScore float64
}
func newNodeItem(priority int, nodeID int64) nodeItem {
func newNodeItem(currentScore int, nodeID int64) nodeItem {
return nodeItem{
baseItem: baseItem{
priority: priority,
},
nodeID: nodeID,
baseItem: baseItem{},
nodeID: nodeID,
currentScore: float64(currentScore),
}
}
func (b *nodeItem) getPriority() int {
// if node lacks more score between assignedScore and currentScore, then higher priority
return int(b.currentScore - b.assignedScore)
}
func (b *nodeItem) setPriority(priority int) {
panic("not supported, use updatePriority instead")
}
func (b *nodeItem) getPriorityWithCurrentScoreDelta(delta float64) int {
return int((b.currentScore + delta) - b.assignedScore)
}
func (b *nodeItem) getCurrentScore() float64 {
return b.currentScore
}
func (b *nodeItem) AddCurrentScoreDelta(delta float64) {
b.currentScore += delta
b.priority = b.getPriority()
}
func (b *nodeItem) getAssignedScore() float64 {
return b.assignedScore
}
func (b *nodeItem) setAssignedScore(delta float64) {
b.assignedScore += delta
b.priority = b.getPriority()
}
func (b *nodeItem) String() string {
return fmt.Sprintf("{NodeID: %d, AssignedScore: %f, CurrentScore: %f, Priority: %d}", b.nodeID, b.assignedScore, b.currentScore, b.priority)
}

View File

@ -63,6 +63,7 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
if len(nodeItemsMap) == 0 {
return nil
}
log.Info("node workload status", zap.Int64("collectionID", collectionID), zap.Stringers("nodes", lo.Values(nodeItemsMap)))
queue := newPriorityQueue()
for _, item := range nodeItemsMap {
@ -89,13 +90,13 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
targetNode := queue.pop().(*nodeItem)
// make sure candidate is always push back
defer queue.push(targetNode)
priorityChange := b.calculateSegmentScore(s)
scoreChanges := b.calculateSegmentScore(s)
sourceNode := nodeItemsMap[s.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for manual balance
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, priorityChange) {
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
return
}
@ -112,15 +113,15 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
Segment: s,
FromScore: fromScore,
ToScore: int64(targetNode.getPriority()),
SegmentScore: int64(priorityChange),
SegmentScore: int64(scoreChanges),
}
plans = append(plans, plan)
// update the sourceNode and targetNode's score
if sourceNode != nil {
sourceNode.setPriority(sourceNode.getPriority() - priorityChange)
sourceNode.AddCurrentScoreDelta(-scoreChanges)
}
targetNode.setPriority(targetNode.getPriority() + priorityChange)
targetNode.AddCurrentScoreDelta(scoreChanges)
}(s)
if len(plans) > balanceBatchSize {
@ -130,21 +131,21 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
return plans
}
func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, priorityChange int) bool {
func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, scoreChanges float64) bool {
// if the score diff between sourceNode and targetNode is lower than the unbalance toleration factor, there is no need to assign it targetNode
oldScoreDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority()))
if oldScoreDiff < float64(targetNode.getPriority())*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() {
oldPriorityDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority()))
if oldPriorityDiff < float64(targetNode.getPriority())*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() {
return false
}
newSourceScore := sourceNode.getPriority() - priorityChange
newTargetScore := targetNode.getPriority() + priorityChange
if newTargetScore > newSourceScore {
newSourcePriority := sourceNode.getPriorityWithCurrentScoreDelta(-scoreChanges)
newTargetPriority := targetNode.getPriorityWithCurrentScoreDelta(scoreChanges)
if newTargetPriority > newSourcePriority {
// if score diff reverted after segment reassignment, we will consider the benefit
// only trigger following segment reassignment when the generated reverted score diff
// is far smaller than the original score diff
newScoreDiff := math.Abs(float64(newSourceScore) - float64(newTargetScore))
if newScoreDiff*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() >= oldScoreDiff {
newScoreDiff := math.Abs(float64(newSourcePriority) - float64(newTargetPriority))
if newScoreDiff*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() >= oldPriorityDiff {
return false
}
}
@ -155,25 +156,48 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *
func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) map[int64]*nodeItem {
totalScore := 0
nodeScoreMap := make(map[int64]*nodeItem)
nodeMemMap := make(map[int64]float64)
totalMemCapacity := float64(0)
allNodeHasMemInfo := true
for _, node := range nodeIDs {
score := b.calculateScore(collectionID, node)
nodeItem := newNodeItem(score, node)
nodeScoreMap[node] = &nodeItem
totalScore += score
// set memory default to 1.0, will multiply average value to compute assigned score
nodeInfo := b.nodeManager.Get(node)
if nodeInfo != nil {
totalMemCapacity += nodeInfo.MemCapacity()
nodeMemMap[node] = nodeInfo.MemCapacity()
}
allNodeHasMemInfo = allNodeHasMemInfo && nodeInfo != nil && nodeInfo.MemCapacity() > 0
}
if totalScore == 0 {
return nodeScoreMap
}
average := totalScore / len(nodeIDs)
// if all node has memory info, we will use totalScore / totalMemCapacity to calculate the score, then average means average score on memory unit
// otherwise, we will use totalScore / len(nodeItemsMap) to calculate the score, then average means average score on node unit
average := float64(0)
if allNodeHasMemInfo {
average = float64(totalScore) / totalMemCapacity
} else {
average = float64(totalScore) / float64(len(nodeIDs))
}
delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat()
// use average * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator
for _, node := range nodeIDs {
if allNodeHasMemInfo {
nodeScoreMap[node].setAssignedScore(nodeMemMap[node] * average)
} else {
nodeScoreMap[node].setAssignedScore(average)
}
// use assignedScore * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator
collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(node))
if len(collectionViews) > 0 {
newScore := nodeScoreMap[node].getPriority() + int(float64(average)*delegatorOverloadFactor)*len(collectionViews)
nodeScoreMap[node].setPriority(newScore)
nodeScoreMap[node].AddCurrentScoreDelta(nodeScoreMap[node].getAssignedScore() * delegatorOverloadFactor * float64(len(collectionViews)))
}
}
return nodeScoreMap
@ -217,8 +241,8 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
}
// calculateSegmentScore calculate the score which the segment represented
func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) int {
return int(float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()))
func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) float64 {
return float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}
func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
@ -288,8 +312,10 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin
func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
totalScore := 0
nodeItemsMap := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
// list all segment which could be balanced, and calculate node's score
for _, node := range onlineNodes {
@ -298,21 +324,16 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
})
segmentDist[node] = segments
totalScore += nodeScore[node].getPriority()
}
if totalScore == 0 {
return nil
}
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
// find the segment from the node which has more score than the average
segmentsToMove := make([]*meta.Segment, 0)
average := totalScore / len(onlineNodes)
for node, segments := range segmentDist {
leftScore := nodeScore[node].getPriority()
if leftScore <= average {
currentScore := nodeItemsMap[node].getCurrentScore()
assignedScore := nodeItemsMap[node].getAssignedScore()
if currentScore <= assignedScore {
continue
}
@ -324,8 +345,9 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
if len(segmentsToMove) >= balanceBatchSize {
break
}
leftScore -= b.calculateSegmentScore(s)
if leftScore <= average {
currentScore -= b.calculateSegmentScore(s)
if currentScore <= assignedScore {
break
}
}

View File

@ -1001,6 +1001,107 @@ func (suite *ScoreBasedBalancerTestSuite) TestMultiReplicaBalance() {
}
}
func (suite *ScoreBasedBalancerTestSuite) TestQNMemoryCapacity() {
cases := []struct {
name string
nodes []int64
collectionID int64
replicaID int64
collectionsSegments []*datapb.SegmentInfo
states []session.State
shouldMock bool
distributions map[int64][]*meta.Segment
distributionChannels map[int64][]*meta.DmChannel
expectPlans []SegmentAssignPlan
expectChannelPlans []ChannelAssignPlan
}{
{
name: "test qn memory capacity",
nodes: []int64{1, 2},
collectionID: 1,
replicaID: 1,
collectionsSegments: []*datapb.SegmentInfo{
{ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, {ID: 4, PartitionID: 1},
},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 20}, Node: 2},
},
},
expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{},
},
}
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
// 1. set up target for multi collections
collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID))
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return(
nil, c.collectionsSegments, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe()
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID))
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for node, v := range c.distributionChannels {
balancer.dist.ChannelDistManager.Update(node, v...)
}
// 3. set up nodes info and resourceManager for balancer
nodeInfoMap := make(map[int64]*session.NodeInfo)
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
nodeInfoMap[c.nodes[i]] = nodeInfo
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
utils.RecoverAllCollection(balancer.meta)
// test qn has same memory capacity
nodeInfoMap[1].UpdateStats(session.WithMemCapacity(1024))
nodeInfoMap[2].UpdateStats(session.WithMemCapacity(1024))
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID)
suite.Len(channelPlans, 0)
suite.Len(segmentPlans, 1)
suite.Equal(segmentPlans[0].To, int64(1))
suite.Equal(segmentPlans[0].Segment.NumOfRows, int64(20))
// test qn has different memory capacity
nodeInfoMap[1].UpdateStats(session.WithMemCapacity(1024))
nodeInfoMap[2].UpdateStats(session.WithMemCapacity(2048))
segmentPlans, channelPlans = suite.getCollectionBalancePlans(balancer, c.collectionID)
suite.Len(channelPlans, 0)
suite.Len(segmentPlans, 1)
suite.Equal(segmentPlans[0].To, int64(1))
suite.Equal(segmentPlans[0].Segment.NumOfRows, int64(10))
})
}
}
func TestScoreBasedBalancerSuite(t *testing.T) {
suite.Run(t, new(ScoreBasedBalancerTestSuite))
}

View File

@ -123,8 +123,8 @@ func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse,
node.UpdateStats(
session.WithSegmentCnt(len(resp.GetSegments())),
session.WithChannelCnt(len(resp.GetChannels())),
session.WithMemCapacity(resp.GetMemCapacityInMB()),
)
dh.updateSegmentsDistribution(resp)
dh.updateChannelsDistribution(resp)
dh.updateLeaderView(resp)

View File

@ -159,6 +159,13 @@ func (n *NodeInfo) ChannelCnt() int {
return n.stats.getChannelCnt()
}
// return node's memory capacity in mb
func (n *NodeInfo) MemCapacity() float64 {
n.mu.RLock()
defer n.mu.RUnlock()
return n.stats.getMemCapacity()
}
func (n *NodeInfo) SetLastHeartbeat(time time.Time) {
n.lastHeartbeat.Store(time.UnixNano())
}
@ -218,3 +225,9 @@ func WithChannelCnt(cnt int) StatsOption {
n.setChannelCnt(cnt)
}
}
func WithMemCapacity(capacity float64) StatsOption {
return func(n *NodeInfo) {
n.setMemCapacity(capacity)
}
}

View File

@ -17,8 +17,9 @@
package session
type stats struct {
segmentCnt int
channelCnt int
segmentCnt int
channelCnt int
memCapacityInMB float64
}
func (s *stats) setSegmentCnt(cnt int) {
@ -37,6 +38,14 @@ func (s *stats) getChannelCnt() int {
return s.channelCnt
}
func (s *stats) setMemCapacity(capacity float64) {
s.memCapacityInMB = capacity
}
func (s *stats) getMemCapacity() float64 {
return s.memCapacityInMB
}
func newStats() stats {
return stats{}
}

View File

@ -48,6 +48,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -1239,12 +1240,13 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
})
return &querypb.GetDataDistributionResponse{
Status: merr.Success(),
NodeID: node.GetNodeID(),
Segments: segmentVersionInfos,
Channels: channelVersionInfos,
LeaderViews: leaderViews,
LastModifyTs: lastModifyTs,
Status: merr.Success(),
NodeID: node.GetNodeID(),
Segments: segmentVersionInfos,
Channels: channelVersionInfos,
LeaderViews: leaderViews,
LastModifyTs: lastModifyTs,
MemCapacityInMB: float64(hardware.GetMemoryCount() / 1024 / 1024),
}, nil
}