enhance: [2.5] Add trigger interval config for auto balance (#39154) (#39918)

issue: #39156
pr: #39154

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/40962/head
wei liu 2025-03-27 16:40:23 +08:00 committed by GitHub
parent deed5b5df4
commit b64bb63e77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 132 additions and 28 deletions

View File

@ -359,6 +359,7 @@ queryCoord:
checkSegmentInterval: 1000
checkChannelInterval: 1000
checkBalanceInterval: 3000
autoBalanceInterval: 3000 # the interval for triggerauto balance
checkIndexInterval: 10000
channelTaskTimeout: 60000 # 1 minute
segmentTaskTimeout: 120000 # 2 minute

View File

@ -46,6 +46,9 @@ type BalanceChecker struct {
scheduler task.Scheduler
targetMgr meta.TargetManagerInterface
getBalancerFunc GetBalancerFunc
// record auto balance ts
autoBalanceTs time.Time
}
func NewBalanceChecker(meta *meta.Meta,
@ -80,22 +83,12 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b
return metaExist && targetExist
}
func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int64 {
ids := b.meta.GetAll(ctx)
// all replicas belonging to loading collection will be skipped
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
collection := b.meta.GetCollection(ctx, cid)
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
})
sort.Slice(loadedCollections, func(i, j int) bool {
return loadedCollections[i] < loadedCollections[j]
})
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
// balance collections influenced by stopping nodes
stoppingReplicas := make([]int64, 0)
for _, cid := range loadedCollections {
for _, cid := range ids {
// if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(ctx, cid) {
continue
@ -113,12 +106,27 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
}
}
return nil
}
func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 {
// 1. no stopping balance and auto balance is disabled, return empty collections for balance
// 2. when balancer isn't active, skip auto balance
if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() {
return nil
}
ids := b.meta.GetAll(ctx)
// all replicas belonging to loading collection will be skipped
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
collection := b.meta.GetCollection(ctx, cid)
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
})
sort.Slice(loadedCollections, func(i, j int) bool {
return loadedCollections[i] < loadedCollections[j]
})
// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
@ -173,16 +181,27 @@ func (b *BalanceChecker) balanceReplicas(ctx context.Context, replicaIDs []int64
}
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
ret := make([]task.Task, 0)
replicasToBalance := b.replicasToBalance(ctx)
segmentPlans, channelPlans := b.balanceReplicas(ctx, replicasToBalance)
// iterate all collection to find a collection to balance
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
replicasToBalance := b.replicasToBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
var segmentPlans []balance.SegmentAssignPlan
var channelPlans []balance.ChannelAssignPlan
stoppingReplicas := b.getReplicaForStoppingBalance(ctx)
if len(stoppingReplicas) > 0 {
// check for stopping balance first
segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas)
} else {
// then check for auto balance
if time.Since(b.autoBalanceTs) > paramtable.Get().QueryCoordCfg.AutoBalanceInterval.GetAsDuration(time.Millisecond) {
b.autoBalanceTs = time.Now()
replicasToBalance := b.getReplicaForNormalBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
// iterate all collection to find a collection to balance
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
replicasToBalance := b.getReplicaForNormalBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
}
}
}
ret := make([]task.Task, 0)
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
task.SetPriority(task.TaskPriorityLow, tasks...)
task.SetReason("segment unbalanced", tasks...)

View File

@ -19,9 +19,11 @@ package checkers
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
@ -144,7 +146,7 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
return 0
})
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicasToBalance)
segPlans, _ := suite.checker.balanceReplicas(ctx, replicasToBalance)
suite.Empty(segPlans)
@ -152,14 +154,14 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
// test enable auto balance
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
// next round
idsToBalance = []int64{int64(replicaID2)}
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
// final round
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicasToBalance)
}
@ -221,7 +223,7 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
return 1
})
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Len(replicasToBalance, 1)
}
@ -289,7 +291,7 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
// test stopping balance
idsToBalance := []int64{int64(replicaID1), int64(replicaID2)}
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
// checker check
@ -347,7 +349,7 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
// test normal balance when one collection has unready target
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Len(replicasToBalance, 0)
// test stopping balance with target not ready
@ -364,10 +366,80 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
}
func (suite *BalanceCheckerTestSuite) TestAutoBalanceInterval() {
ctx := context.Background()
// set up nodes info
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
{
ID: 2,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
funcCallCounter := atomic.NewInt64(0)
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, r *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
funcCallCounter.Inc()
return nil, nil
})
// first auto balance should be triggered
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))
// second auto balance won't be triggered due to autoBalanceInterval == 3s
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))
// set autoBalanceInterval == 1, sleep 1s, auto balance should be triggered
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "1000")
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key)
time.Sleep(1 * time.Second)
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))
}
func TestBalanceCheckerSuite(t *testing.T) {
suite.Run(t, new(BalanceCheckerTestSuite))
}

View File

@ -1894,6 +1894,7 @@ type queryCoordConfig struct {
SegmentCheckInterval ParamItem `refreshable:"true"`
ChannelCheckInterval ParamItem `refreshable:"true"`
BalanceCheckInterval ParamItem `refreshable:"true"`
AutoBalanceInterval ParamItem `refreshable:"true"`
IndexCheckInterval ParamItem `refreshable:"true"`
ChannelTaskTimeout ParamItem `refreshable:"true"`
SegmentTaskTimeout ParamItem `refreshable:"true"`
@ -2509,6 +2510,16 @@ If this parameter is set false, Milvus simply searches the growing segments with
Export: false,
}
p.ClusterLevelLoadResourceGroups.Init(base.mgr)
p.AutoBalanceInterval = ParamItem{
Key: "queryCoord.autoBalanceInterval",
Version: "2.5.3",
DefaultValue: "3000",
Doc: "the interval for triggerauto balance",
PanicIfEmpty: true,
Export: true,
}
p.AutoBalanceInterval.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -379,6 +379,7 @@ func TestComponentParam(t *testing.T) {
assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0)
assert.Equal(t, 10, Params.CollectionChannelCountFactor.GetAsInt())
assert.Equal(t, 3000, Params.AutoBalanceInterval.GetAsInt())
})
t.Run("test queryNodeConfig", func(t *testing.T) {