fix: [2.5] Address manual balance and balance check issues (#41038)

issue: #37651
pr: #41037
- Fix context propagation for manual balance segment task creation from
PR #38080.
- Optimize stopping balance by preventing redundant checks per round,
addressing performance regression from PR #40297.
- Decrease default `checkBalanceInterval` from 3000ms to 300ms.
- Correct minor log messages in `BalanceChecker`.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/41080/head
wei liu 2025-04-03 01:26:23 +08:00 committed by GitHub
parent 249d5b9b41
commit 37a533fe6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 61 additions and 19 deletions

View File

@ -358,7 +358,7 @@ queryCoord:
balanceCostThreshold: 0.001 # the threshold of balance cost, if the difference of cluster's cost after executing the balance plan is less than this value, the plan will not be executed
checkSegmentInterval: 1000
checkChannelInterval: 1000
checkBalanceInterval: 3000
checkBalanceInterval: 300
autoBalanceInterval: 3000 # the interval for triggerauto balance
checkIndexInterval: 10000
channelTaskTimeout: 60000 # 1 minute

View File

@ -41,12 +41,14 @@ import (
// BalanceChecker checks the cluster distribution and generates balance tasks.
type BalanceChecker struct {
*checkerActivation
meta *meta.Meta
nodeManager *session.NodeManager
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
scheduler task.Scheduler
targetMgr meta.TargetManagerInterface
getBalancerFunc GetBalancerFunc
meta *meta.Meta
nodeManager *session.NodeManager
scheduler task.Scheduler
targetMgr meta.TargetManagerInterface
getBalancerFunc GetBalancerFunc
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
stoppingBalanceCollectionsCurrentRound typeutil.UniqueSet
// record auto balance ts
autoBalanceTs time.Time
@ -59,13 +61,14 @@ func NewBalanceChecker(meta *meta.Meta,
getBalancerFunc GetBalancerFunc,
) *BalanceChecker {
return &BalanceChecker{
checkerActivation: newCheckerActivation(),
meta: meta,
targetMgr: targetMgr,
nodeManager: nodeMgr,
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
scheduler: scheduler,
getBalancerFunc: getBalancerFunc,
checkerActivation: newCheckerActivation(),
meta: meta,
targetMgr: targetMgr,
nodeManager: nodeMgr,
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
stoppingBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
scheduler: scheduler,
getBalancerFunc: getBalancerFunc,
}
}
@ -91,11 +94,24 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int
ids = b.sortCollections(ctx, ids)
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
hasUnbalancedCollection := false
defer func() {
if !hasUnbalancedCollection {
b.stoppingBalanceCollectionsCurrentRound.Clear()
log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+
"collections in one round, clear collectionIDs for this round")
}
}()
for _, cid := range ids {
// if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(ctx, cid) {
continue
}
if b.stoppingBalanceCollectionsCurrentRound.Contain(cid) {
log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round",
zap.Int64("collectionID", cid))
continue
}
replicas := b.meta.ReplicaManager.GetByCollection(ctx, cid)
stoppingReplicas := make([]int64, 0)
for _, replica := range replicas {
@ -104,6 +120,8 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int
}
}
if len(stoppingReplicas) > 0 {
hasUnbalancedCollection = true
b.stoppingBalanceCollectionsCurrentRound.Insert(cid)
return stoppingReplicas
}
}
@ -146,7 +164,7 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64
hasUnbalancedCollection := false
for _, cid := range loadedCollections {
if b.normalBalanceCollectionsCurrentRound.Contain(cid) {
log.RatedDebug(10, "ScoreBasedBalancer is balancing this collection, skip balancing in this round",
log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round",
zap.Int64("collectionID", cid))
continue
}
@ -160,7 +178,7 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64
if !hasUnbalancedCollection {
b.normalBalanceCollectionsCurrentRound.Clear()
log.RatedDebug(10, "ScoreBasedBalancer has balanced all "+
log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+
"collections in one round, clear collectionIDs for this round")
}
return normalReplicasToBalance
@ -191,7 +209,7 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
// check for stopping balance first
segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas)
// iterate all collection to find a collection to balance
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.stoppingBalanceCollectionsCurrentRound.Len() > 0 {
replicasToBalance := b.getReplicaForStoppingBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
}

View File

@ -290,9 +290,31 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
// test stopping balance
// First round: check replica1
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// Second round: should skip replica1, check replica2
idsToBalance = []int64{int64(replicaID2)}
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// Third round: all collections checked, should return nil and clear the set
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Empty(replicasToBalance)
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// reset meta for Check test
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
mr1 = replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
// checker check
segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0)
@ -730,6 +752,7 @@ func (suite *BalanceCheckerTestSuite) TestBalanceTriggerOrder() {
replicas = suite.checker.getReplicaForNormalBalance(ctx)
suite.Contains(replicas, replicaID1, "Should balance collection with lowest ID first")
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
// Stopping balance should also pick the collection with lowest ID first
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID")

View File

@ -127,7 +127,7 @@ func (s *Server) balanceSegments(ctx context.Context,
actions = append(actions, releaseAction)
}
t, err := task.NewSegmentTask(ctx,
t, err := task.NewSegmentTask(s.ctx,
Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
utils.ManualBalance,
collectionID,

View File

@ -2213,7 +2213,7 @@ If this parameter is set false, Milvus simply searches the growing segments with
p.BalanceCheckInterval = ParamItem{
Key: "queryCoord.checkBalanceInterval",
Version: "2.3.0",
DefaultValue: "3000",
DefaultValue: "300",
PanicIfEmpty: true,
Export: true,
}

View File

@ -347,6 +347,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt())
assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt())
assert.Equal(t, 300, Params.BalanceCheckInterval.GetAsInt())
params.Save(Params.BalanceCheckInterval.Key, "3000")
assert.Equal(t, 3000, Params.BalanceCheckInterval.GetAsInt())
assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt())