milvus/internal/querycoordv2/checkers/balance_checker_test.go

1014 lines
42 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkers
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type BalanceCheckerTestSuite struct {
suite.Suite
kv kv.MetaKv
checker *BalanceChecker
balancer *balance.MockBalancer
meta *meta.Meta
broker *meta.MockBroker
nodeMgr *session.NodeManager
scheduler *task.MockScheduler
targetMgr meta.TargetManagerInterface
}
func (suite *BalanceCheckerTestSuite) SetupSuite() {
paramtable.Init()
}
func (suite *BalanceCheckerTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())
suite.scheduler.EXPECT().Add(mock.Anything).Return(nil).Maybe()
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.balancer = balance.NewMockBalancer(suite.T())
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer })
}
func (suite *BalanceCheckerTestSuite) TearDownTest() {
suite.kv.Close()
}
func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
ctx := context.Background()
// set up nodes info
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
// set collections meta
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid2))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid2))
// test disable auto balance
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false")
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
return 0
})
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicasToBalance)
segPlans, _ := suite.checker.balanceReplicas(ctx, replicasToBalance)
suite.Empty(segPlans)
// test enable auto balance
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
// next round
idsToBalance = []int64{int64(replicaID2)}
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
// final round
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicasToBalance)
}
func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
ctx := context.Background()
// set up nodes info
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid2))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid2))
// test scheduler busy
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
return 1
})
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Len(replicasToBalance, 1)
}
func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
ctx := context.Background()
// set up nodes info, stopping node1
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Stopping(int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid2))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid2))
mr1 := replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
mr2 := replica2.CopyForWrite()
mr2.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
// test stopping balance
// First round: check replica1
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// Second round: should skip replica1, check replica2
idsToBalance = []int64{int64(replicaID2)}
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// Third round: all collections checked, should return nil and clear the set
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Empty(replicasToBalance)
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// reset meta for Check test
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
mr1 = replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
// checker check
segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0)
mockPlan := balance.SegmentAssignPlan{
Segment: utils.CreateTestSegment(1, 1, 1, 1, 1, "1"),
Replica: meta.NilReplica,
From: 1,
To: 2,
}
segPlans = append(segPlans, mockPlan)
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).Return(segPlans, chanPlans)
tasks := make([]task.Task, 0)
suite.scheduler.ExpectedCalls = nil
suite.scheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(task task.Task) error {
tasks = append(tasks, task)
return nil
})
suite.checker.Check(context.TODO())
suite.Len(tasks, 2)
}
func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
ctx := context.Background()
// set up nodes info, stopping node1
nodeID1, nodeID2 := int64(1), int64(2)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID2,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Stopping(nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
mockTarget := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTarget
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{nodeID1, nodeID2})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{nodeID1, nodeID2})
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
// test normal balance when one collection has unready target
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
mockTarget.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(100).Maybe()
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Len(replicasToBalance, 0)
// test stopping balance with target not ready
mockTarget.ExpectedCalls = nil
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(false)
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid1), mock.Anything).Return(true).Maybe()
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid2), mock.Anything).Return(false).Maybe()
mockTarget.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(100).Maybe()
mr1 := replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
mr2 := replica2.CopyForWrite()
mr2.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
}
func (suite *BalanceCheckerTestSuite) TestAutoBalanceInterval() {
ctx := context.Background()
// set up nodes info
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
{
ID: 2,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
funcCallCounter := atomic.NewInt64(0)
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, r *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
funcCallCounter.Inc()
return nil, nil
})
// first auto balance should be triggered
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))
// second auto balance won't be triggered due to autoBalanceInterval == 3s
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))
// set autoBalanceInterval == 1, sleep 1s, auto balance should be triggered
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "1000")
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key)
time.Sleep(1 * time.Second)
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))
}
func (suite *BalanceCheckerTestSuite) TestBalanceOrder() {
ctx := context.Background()
nodeID1, nodeID2 := int64(1), int64(2)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{nodeID1, nodeID2})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{nodeID1, nodeID2})
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
// mock collection row count
mockTargetManager := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTargetManager
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid1), mock.Anything).Return(int64(100)).Maybe()
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid2), mock.Anything).Return(int64(200)).Maybe()
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
// mock stopping node
mr1 := replica1.CopyForWrite()
mr1.AddRONode(nodeID1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
mr2 := replica2.CopyForWrite()
mr2.AddRONode(nodeID2)
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
// test normal balance order
replicas := suite.checker.getReplicaForNormalBalance(ctx)
suite.Equal(replicas, []int64{int64(replicaID2)})
// test stopping balance order
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Equal(replicas, []int64{int64(replicaID2)})
// mock collection row count
mockTargetManager.ExpectedCalls = nil
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid1), mock.Anything).Return(int64(200)).Maybe()
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid2), mock.Anything).Return(int64(100)).Maybe()
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
// test normal balance order
replicas = suite.checker.getReplicaForNormalBalance(ctx)
suite.Equal(replicas, []int64{int64(replicaID1)})
// test stopping balance order
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Equal(replicas, []int64{int64(replicaID1)})
}
func (suite *BalanceCheckerTestSuite) TestSortCollections() {
ctx := context.Background()
// Set up test collections
cid1, cid2, cid3 := int64(1), int64(2), int64(3)
// Mock the target manager for row count returns
mockTargetManager := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTargetManager
// Collection 1: Low ID, High row count
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid1, mock.Anything).Return(int64(300)).Maybe()
// Collection 2: Middle ID, Low row count
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid2, mock.Anything).Return(int64(100)).Maybe()
// Collection 3: High ID, Middle row count
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid3, mock.Anything).Return(int64(200)).Maybe()
collections := []int64{cid1, cid2, cid3}
// Test ByRowCount sorting (default)
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByRowCount")
sortedCollections := suite.checker.sortCollections(ctx, collections)
suite.Equal([]int64{cid1, cid3, cid2}, sortedCollections, "Collections should be sorted by row count (highest first)")
// Test ByCollectionID sorting
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByCollectionID")
sortedCollections = suite.checker.sortCollections(ctx, collections)
suite.Equal([]int64{cid1, cid2, cid3}, sortedCollections, "Collections should be sorted by collection ID (ascending)")
// Test with empty sort order (should default to ByRowCount)
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "")
sortedCollections = suite.checker.sortCollections(ctx, collections)
suite.Equal([]int64{cid1, cid3, cid2}, sortedCollections, "Should default to ByRowCount when sort order is empty")
// Test with invalid sort order (should default to ByRowCount)
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "InvalidOrder")
sortedCollections = suite.checker.sortCollections(ctx, collections)
suite.Equal([]int64{cid1, cid3, cid2}, sortedCollections, "Should default to ByRowCount when sort order is invalid")
// Test with mixed case (should be case-insensitive)
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "bYcOlLeCtIoNiD")
sortedCollections = suite.checker.sortCollections(ctx, collections)
suite.Equal([]int64{cid1, cid2, cid3}, sortedCollections, "Should handle case-insensitive sort order names")
// Test with empty collection list
emptyCollections := []int64{}
sortedCollections = suite.checker.sortCollections(ctx, emptyCollections)
suite.Equal([]int64{}, sortedCollections, "Should handle empty collection list")
}
func (suite *BalanceCheckerTestSuite) TestSortCollectionsIntegration() {
ctx := context.Background()
// Set up test collections and nodes
nodeID1 := int64(1)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID1,
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
// Create two collections to ensure sorting is triggered
cid1, replicaID1 := int64(1), int64(101)
collection1 := utils.CreateTestCollection(cid1, int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1})
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
// Add a second collection with different characteristics
cid2, replicaID2 := int64(2), int64(102)
collection2 := utils.CreateTestCollection(cid2, int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(replicaID2, cid2, []int64{nodeID1})
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
// Mock target manager
mockTargetManager := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTargetManager
// Setup different row counts to test sorting
// Collection 1 has more rows than Collection 2
var getRowCountCallCount int
mockTargetManager.On("GetCollectionRowCount", mock.Anything, mock.Anything, mock.Anything).
Return(func(ctx context.Context, collectionID int64, scope int32) int64 {
getRowCountCallCount++
if collectionID == cid1 {
return 200 // More rows in collection 1
}
return 100 // Fewer rows in collection 2
})
mockTargetManager.On("IsCurrentTargetReady", mock.Anything, mock.Anything).Return(true)
mockTargetManager.On("IsNextTargetExist", mock.Anything, mock.Anything).Return(true)
// Configure for testing
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByRowCount")
// Clear first to avoid previous test state
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
// Call normal balance
_ = suite.checker.getReplicaForNormalBalance(ctx)
// Verify GetCollectionRowCount was called at least twice (once for each collection)
// This confirms that the collections were sorted
suite.True(getRowCountCallCount >= 2, "GetCollectionRowCount should be called at least twice during normal balance")
// Reset counter and test stopping balance
getRowCountCallCount = 0
// Set up for stopping balance test
mr1 := replica1.CopyForWrite()
mr1.AddRONode(nodeID1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
mr2 := replica2.CopyForWrite()
mr2.AddRONode(nodeID1)
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true")
// Call stopping balance
_ = suite.checker.getReplicaForStoppingBalance(ctx)
// Verify GetCollectionRowCount was called at least twice during stopping balance
suite.True(getRowCountCallCount >= 2, "GetCollectionRowCount should be called at least twice during stopping balance")
}
func (suite *BalanceCheckerTestSuite) TestBalanceTriggerOrder() {
ctx := context.Background()
// Set up nodes
nodeID1, nodeID2 := int64(1), int64(2)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID2,
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
// Create collections with different row counts
cid1, replicaID1 := int64(1), int64(101)
collection1 := utils.CreateTestCollection(cid1, int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1, nodeID2})
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
cid2, replicaID2 := int64(2), int64(102)
collection2 := utils.CreateTestCollection(cid2, int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(replicaID2, cid2, []int64{nodeID1, nodeID2})
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
cid3, replicaID3 := int64(3), int64(103)
collection3 := utils.CreateTestCollection(cid3, int32(replicaID3))
collection3.Status = querypb.LoadStatus_Loaded
replica3 := utils.CreateTestReplica(replicaID3, cid3, []int64{nodeID1, nodeID2})
suite.checker.meta.CollectionManager.PutCollection(ctx, collection3)
suite.checker.meta.ReplicaManager.Put(ctx, replica3)
// Mock the target manager
mockTargetManager := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTargetManager
// Set row counts: Collection 1 (highest), Collection 3 (middle), Collection 2 (lowest)
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid1, mock.Anything).Return(int64(300)).Maybe()
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid2, mock.Anything).Return(int64(100)).Maybe()
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid3, mock.Anything).Return(int64(200)).Maybe()
// Mark the current target as ready
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe()
// Enable auto balance
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
// Test with ByRowCount order (default)
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByRowCount")
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
// Normal balance should pick the collection with highest row count first
replicas := suite.checker.getReplicaForNormalBalance(ctx)
suite.Contains(replicas, replicaID1, "Should balance collection with highest row count first")
// Add stopping nodes to test stopping balance
mr1 := replica1.CopyForWrite()
mr1.AddRONode(nodeID1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
mr2 := replica2.CopyForWrite()
mr2.AddRONode(nodeID1)
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
mr3 := replica3.CopyForWrite()
mr3.AddRONode(nodeID1)
suite.checker.meta.ReplicaManager.Put(ctx, mr3.IntoReplica())
// Enable stopping balance
paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true")
// Stopping balance should also pick the collection with highest row count first
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with highest row count")
// Test with ByCollectionID order
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByCollectionID")
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
// Normal balance should pick the collection with lowest ID first
replicas = suite.checker.getReplicaForNormalBalance(ctx)
suite.Contains(replicas, replicaID1, "Should balance collection with lowest ID first")
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
// Stopping balance should also pick the collection with lowest ID first
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID")
}
func (suite *BalanceCheckerTestSuite) TestHasUnbalancedCollectionFlag() {
ctx := context.Background()
// Set up nodes
nodeID1, nodeID2 := int64(1), int64(2)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID2,
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
// Create collection
cid1, replicaID1 := int64(1), int64(101)
collection1 := utils.CreateTestCollection(cid1, int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1, nodeID2})
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
// Mock the target manager
mockTargetManager := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTargetManager
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(int64(100)).Maybe()
// 1. Test normal balance with auto balance disabled
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false")
// The collections set should be initially empty
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len())
// Get replicas - should return nil and keep the set empty
replicas := suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicas)
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(),
"normalBalanceCollectionsCurrentRound should remain empty when auto balance is disabled")
// 2. Test normal balance when targetMgr.IsCurrentTargetReady returns false
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false).Maybe()
// The collections set should be initially empty
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len())
// Get replicas - should return nil and keep the set empty because of not ready targets
replicas = suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicas)
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(),
"normalBalanceCollectionsCurrentRound should remain empty when targets are not ready")
// 3. Test stopping balance when there are no RO nodes
paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true")
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe()
// The collections set should be initially empty
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
// Get replicas - should return nil and keep the set empty because there are no RO nodes
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Empty(replicas)
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len(),
"stoppingBalanceCollectionsCurrentRound should remain empty when there are no RO nodes")
// 4. Test stopping balance with RO nodes
// Add a RO node to the replica
mr1 := replica1.CopyForWrite()
mr1.AddRONode(nodeID1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
// The collections set should be initially empty
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
// Get replicas - should return the replica ID and add the collection to the set
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Equal([]int64{replicaID1}, replicas)
suite.Equal(1, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(cid1),
"stoppingBalanceCollectionsCurrentRound should contain the collection when it has RO nodes")
}
func (suite *BalanceCheckerTestSuite) TestCheckBatchSizesAndMultiCollection() {
ctx := context.Background()
// Set up nodes
nodeID1, nodeID2 := int64(1), int64(2)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID2,
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
// Create 3 collections
for i := 1; i <= 3; i++ {
cid := int64(i)
replicaID := int64(100 + i)
collection := utils.CreateTestCollection(cid, int32(replicaID))
collection.Status = querypb.LoadStatus_Loaded
replica := utils.CreateTestReplica(replicaID, cid, []int64{})
mutableReplica := replica.CopyForWrite()
mutableReplica.AddRWNode(nodeID1)
mutableReplica.AddRONode(nodeID2)
suite.checker.meta.CollectionManager.PutCollection(ctx, collection)
suite.checker.meta.ReplicaManager.Put(ctx, mutableReplica.IntoReplica())
}
// Mock target manager
mockTargetManager := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTargetManager
// All collections have same row count for simplicity
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(int64(100)).Maybe()
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe()
// For each collection, return different segment plans
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.AnythingOfType("*meta.Replica")).RunAndReturn(
func(ctx context.Context, replica *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
// Create 2 segment plans and 1 channel plan per replica
collID := replica.GetCollectionID()
segPlans := make([]balance.SegmentAssignPlan, 0)
chanPlans := make([]balance.ChannelAssignPlan, 0)
// Create 2 segment plans
for j := 1; j <= 2; j++ {
segID := collID*100 + int64(j)
segPlan := balance.SegmentAssignPlan{
Segment: utils.CreateTestSegment(segID, collID, 1, 1, 1, "test-channel"),
Replica: replica,
From: nodeID1,
To: nodeID2,
}
segPlans = append(segPlans, segPlan)
}
// Create 1 channel plan
chanPlan := balance.ChannelAssignPlan{
Channel: &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: collID,
ChannelName: "test-channel",
},
},
Replica: replica,
From: nodeID1,
To: nodeID2,
}
chanPlans = append(chanPlans, chanPlan)
return segPlans, chanPlans
}).Maybe()
// Add tasks to check batch size limits
var addedTasks []task.Task
suite.scheduler.ExpectedCalls = nil
suite.scheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
addedTasks = append(addedTasks, t)
return nil
}).Maybe()
// Test 1: Balance with multiple collections disabled
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
paramtable.Get().Save(Params.QueryCoordCfg.EnableBalanceOnMultipleCollections.Key, "false")
// Set batch sizes to large values to test single-collection case
paramtable.Get().Save(Params.QueryCoordCfg.BalanceSegmentBatchSize.Key, "10")
paramtable.Get().Save(Params.QueryCoordCfg.BalanceChannelBatchSize.Key, "10")
// Reset test state
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
suite.checker.autoBalanceTs = time.Time{} // Reset to trigger auto balance
addedTasks = nil
// Run the Check method
suite.checker.Check(ctx)
// Should have tasks for a single collection (2 segment tasks + 1 channel task)
suite.Equal(3, len(addedTasks), "Should have tasks for a single collection when multiple collections balance is disabled")
// Test 2: Balance with multiple collections enabled
paramtable.Get().Save(Params.QueryCoordCfg.EnableBalanceOnMultipleCollections.Key, "true")
// Reset test state
suite.checker.autoBalanceTs = time.Time{}
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
addedTasks = nil
// Run the Check method
suite.checker.Check(ctx)
// Should have tasks for all collections (3 collections * (2 segment tasks + 1 channel task) = 9 tasks)
suite.Equal(9, len(addedTasks), "Should have tasks for all collections when multiple collections balance is enabled")
// Test 3: Batch size limits
paramtable.Get().Save(Params.QueryCoordCfg.BalanceSegmentBatchSize.Key, "2")
paramtable.Get().Save(Params.QueryCoordCfg.BalanceChannelBatchSize.Key, "1")
// Reset test state
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
addedTasks = nil
// Run the Check method
suite.checker.Check(ctx)
// Should respect batch size limits: 2 segment tasks + 1 channel task = 3 tasks
suite.Equal(3, len(addedTasks), "Should respect batch size limits")
// Count segment tasks and channel tasks
segmentTaskCount := 0
channelTaskCount := 0
for _, t := range addedTasks {
if _, ok := t.(*task.SegmentTask); ok {
segmentTaskCount++
} else {
channelTaskCount++
}
}
suite.LessOrEqual(segmentTaskCount, 2, "Should have at most 2 segment tasks due to batch size limit")
suite.LessOrEqual(channelTaskCount, 1, "Should have at most 1 channel task due to batch size limit")
}
func TestBalanceCheckerSuite(t *testing.T) {
suite.Run(t, new(BalanceCheckerTestSuite))
}