mirror of https://github.com/milvus-io/milvus.git
1014 lines
42 KiB
Go
1014 lines
42 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package checkers
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/suite"
|
|
"go.uber.org/atomic"
|
|
|
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
|
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
|
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
|
"github.com/milvus-io/milvus/pkg/v2/kv"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
)
|
|
|
|
type BalanceCheckerTestSuite struct {
|
|
suite.Suite
|
|
kv kv.MetaKv
|
|
checker *BalanceChecker
|
|
balancer *balance.MockBalancer
|
|
meta *meta.Meta
|
|
broker *meta.MockBroker
|
|
nodeMgr *session.NodeManager
|
|
scheduler *task.MockScheduler
|
|
targetMgr meta.TargetManagerInterface
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) SetupSuite() {
|
|
paramtable.Init()
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) SetupTest() {
|
|
var err error
|
|
config := GenerateEtcdConfig()
|
|
cli, err := etcd.GetEtcdClient(
|
|
config.UseEmbedEtcd.GetAsBool(),
|
|
config.EtcdUseSSL.GetAsBool(),
|
|
config.Endpoints.GetAsStrings(),
|
|
config.EtcdTLSCert.GetValue(),
|
|
config.EtcdTLSKey.GetValue(),
|
|
config.EtcdTLSCACert.GetValue(),
|
|
config.EtcdTLSMinVersion.GetValue())
|
|
suite.Require().NoError(err)
|
|
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
|
|
|
|
// meta
|
|
store := querycoord.NewCatalog(suite.kv)
|
|
idAllocator := RandomIncrementIDAllocator()
|
|
suite.nodeMgr = session.NewNodeManager()
|
|
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
|
suite.broker = meta.NewMockBroker(suite.T())
|
|
suite.scheduler = task.NewMockScheduler(suite.T())
|
|
suite.scheduler.EXPECT().Add(mock.Anything).Return(nil).Maybe()
|
|
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
|
|
|
suite.balancer = balance.NewMockBalancer(suite.T())
|
|
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer })
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TearDownTest() {
|
|
suite.kv.Close()
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
|
|
ctx := context.Background()
|
|
// set up nodes info
|
|
nodeID1, nodeID2 := 1, 2
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID1),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID2),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
|
|
|
|
// set collections meta
|
|
segments := []*datapb.SegmentInfo{
|
|
{
|
|
ID: 1,
|
|
PartitionID: 1,
|
|
InsertChannel: "test-insert-channel",
|
|
},
|
|
}
|
|
channels := []*datapb.VchannelInfo{
|
|
{
|
|
CollectionID: 1,
|
|
ChannelName: "test-insert-channel",
|
|
},
|
|
}
|
|
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
|
|
|
|
// set collections meta
|
|
cid1, replicaID1, partitionID1 := 1, 1, 1
|
|
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
|
|
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
|
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
|
|
|
|
cid2, replicaID2, partitionID2 := 2, 2, 2
|
|
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
|
|
collection2.Status = querypb.LoadStatus_Loaded
|
|
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
|
|
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
|
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid2))
|
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid2))
|
|
|
|
// test disable auto balance
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false")
|
|
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
|
|
return 0
|
|
})
|
|
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Empty(replicasToBalance)
|
|
segPlans, _ := suite.checker.balanceReplicas(ctx, replicasToBalance)
|
|
suite.Empty(segPlans)
|
|
|
|
// test enable auto balance
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
|
idsToBalance := []int64{int64(replicaID1)}
|
|
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
|
// next round
|
|
idsToBalance = []int64{int64(replicaID2)}
|
|
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
|
// final round
|
|
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Empty(replicasToBalance)
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
|
|
ctx := context.Background()
|
|
// set up nodes info
|
|
nodeID1, nodeID2 := 1, 2
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID1),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID2),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
|
|
|
|
segments := []*datapb.SegmentInfo{
|
|
{
|
|
ID: 1,
|
|
PartitionID: 1,
|
|
InsertChannel: "test-insert-channel",
|
|
},
|
|
}
|
|
channels := []*datapb.VchannelInfo{
|
|
{
|
|
CollectionID: 1,
|
|
ChannelName: "test-insert-channel",
|
|
},
|
|
}
|
|
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
|
|
|
|
// set collections meta
|
|
cid1, replicaID1, partitionID1 := 1, 1, 1
|
|
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
|
|
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
|
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
|
|
|
|
cid2, replicaID2, partitionID2 := 2, 2, 2
|
|
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
|
|
collection2.Status = querypb.LoadStatus_Loaded
|
|
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
|
|
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
|
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid2))
|
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid2))
|
|
|
|
// test scheduler busy
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
|
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
|
|
return 1
|
|
})
|
|
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Len(replicasToBalance, 1)
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
|
|
ctx := context.Background()
|
|
// set up nodes info, stopping node1
|
|
nodeID1, nodeID2 := 1, 2
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID1),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID2),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Stopping(int64(nodeID1))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
|
|
|
|
segments := []*datapb.SegmentInfo{
|
|
{
|
|
ID: 1,
|
|
PartitionID: 1,
|
|
InsertChannel: "test-insert-channel",
|
|
},
|
|
}
|
|
channels := []*datapb.VchannelInfo{
|
|
{
|
|
CollectionID: 1,
|
|
ChannelName: "test-insert-channel",
|
|
},
|
|
}
|
|
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
|
|
|
|
// set collections meta
|
|
cid1, replicaID1, partitionID1 := 1, 1, 1
|
|
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
|
|
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
|
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
|
|
|
|
cid2, replicaID2, partitionID2 := 2, 2, 2
|
|
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
|
|
collection2.Status = querypb.LoadStatus_Loaded
|
|
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
|
|
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
|
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid2))
|
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid2))
|
|
|
|
mr1 := replica1.CopyForWrite()
|
|
mr1.AddRONode(1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
|
|
|
mr2 := replica2.CopyForWrite()
|
|
mr2.AddRONode(1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
|
|
|
|
// test stopping balance
|
|
// First round: check replica1
|
|
idsToBalance := []int64{int64(replicaID1)}
|
|
replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
|
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
|
|
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
|
|
|
|
// Second round: should skip replica1, check replica2
|
|
idsToBalance = []int64{int64(replicaID2)}
|
|
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
|
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
|
|
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
|
|
|
|
// Third round: all collections checked, should return nil and clear the set
|
|
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.Empty(replicasToBalance)
|
|
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
|
|
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
|
|
|
|
// reset meta for Check test
|
|
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
|
mr1 = replica1.CopyForWrite()
|
|
mr1.AddRONode(1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
|
|
|
// checker check
|
|
segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0)
|
|
mockPlan := balance.SegmentAssignPlan{
|
|
Segment: utils.CreateTestSegment(1, 1, 1, 1, 1, "1"),
|
|
Replica: meta.NilReplica,
|
|
From: 1,
|
|
To: 2,
|
|
}
|
|
segPlans = append(segPlans, mockPlan)
|
|
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).Return(segPlans, chanPlans)
|
|
|
|
tasks := make([]task.Task, 0)
|
|
suite.scheduler.ExpectedCalls = nil
|
|
suite.scheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(task task.Task) error {
|
|
tasks = append(tasks, task)
|
|
return nil
|
|
})
|
|
suite.checker.Check(context.TODO())
|
|
suite.Len(tasks, 2)
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
|
|
ctx := context.Background()
|
|
// set up nodes info, stopping node1
|
|
nodeID1, nodeID2 := int64(1), int64(2)
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID1,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID2,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Stopping(nodeID1)
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
|
|
|
|
mockTarget := meta.NewMockTargetManager(suite.T())
|
|
suite.checker.targetMgr = mockTarget
|
|
|
|
// set collections meta
|
|
cid1, replicaID1, partitionID1 := 1, 1, 1
|
|
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{nodeID1, nodeID2})
|
|
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
|
|
cid2, replicaID2, partitionID2 := 2, 2, 2
|
|
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
|
|
collection2.Status = querypb.LoadStatus_Loaded
|
|
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{nodeID1, nodeID2})
|
|
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
|
|
|
|
// test normal balance when one collection has unready target
|
|
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
|
|
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
|
|
mockTarget.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(100).Maybe()
|
|
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Len(replicasToBalance, 0)
|
|
|
|
// test stopping balance with target not ready
|
|
mockTarget.ExpectedCalls = nil
|
|
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(false)
|
|
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid1), mock.Anything).Return(true).Maybe()
|
|
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid2), mock.Anything).Return(false).Maybe()
|
|
mockTarget.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(100).Maybe()
|
|
mr1 := replica1.CopyForWrite()
|
|
mr1.AddRONode(1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
|
|
|
mr2 := replica2.CopyForWrite()
|
|
mr2.AddRONode(1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
|
|
|
|
idsToBalance := []int64{int64(replicaID1)}
|
|
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestAutoBalanceInterval() {
|
|
ctx := context.Background()
|
|
// set up nodes info
|
|
nodeID1, nodeID2 := 1, 2
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID1),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: int64(nodeID2),
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
|
|
|
|
segments := []*datapb.SegmentInfo{
|
|
{
|
|
ID: 1,
|
|
PartitionID: 1,
|
|
InsertChannel: "test-insert-channel",
|
|
},
|
|
{
|
|
ID: 2,
|
|
PartitionID: 1,
|
|
InsertChannel: "test-insert-channel",
|
|
},
|
|
}
|
|
channels := []*datapb.VchannelInfo{
|
|
{
|
|
CollectionID: 1,
|
|
ChannelName: "test-insert-channel",
|
|
},
|
|
}
|
|
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
|
|
|
|
// set collections meta
|
|
cid1, replicaID1, partitionID1 := 1, 1, 1
|
|
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
|
|
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
|
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
|
|
|
|
funcCallCounter := atomic.NewInt64(0)
|
|
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, r *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
|
|
funcCallCounter.Inc()
|
|
return nil, nil
|
|
})
|
|
|
|
// first auto balance should be triggered
|
|
suite.checker.Check(ctx)
|
|
suite.Equal(funcCallCounter.Load(), int64(1))
|
|
|
|
// second auto balance won't be triggered due to autoBalanceInterval == 3s
|
|
suite.checker.Check(ctx)
|
|
suite.Equal(funcCallCounter.Load(), int64(1))
|
|
|
|
// set autoBalanceInterval == 1, sleep 1s, auto balance should be triggered
|
|
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "1000")
|
|
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key)
|
|
time.Sleep(1 * time.Second)
|
|
suite.checker.Check(ctx)
|
|
suite.Equal(funcCallCounter.Load(), int64(1))
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestBalanceOrder() {
|
|
ctx := context.Background()
|
|
nodeID1, nodeID2 := int64(1), int64(2)
|
|
|
|
// set collections meta
|
|
cid1, replicaID1, partitionID1 := 1, 1, 1
|
|
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{nodeID1, nodeID2})
|
|
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
|
|
cid2, replicaID2, partitionID2 := 2, 2, 2
|
|
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
|
|
collection2.Status = querypb.LoadStatus_Loaded
|
|
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{nodeID1, nodeID2})
|
|
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
|
|
|
|
// mock collection row count
|
|
mockTargetManager := meta.NewMockTargetManager(suite.T())
|
|
suite.checker.targetMgr = mockTargetManager
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid1), mock.Anything).Return(int64(100)).Maybe()
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid2), mock.Anything).Return(int64(200)).Maybe()
|
|
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
|
|
// mock stopping node
|
|
mr1 := replica1.CopyForWrite()
|
|
mr1.AddRONode(nodeID1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
|
mr2 := replica2.CopyForWrite()
|
|
mr2.AddRONode(nodeID2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
|
|
|
|
// test normal balance order
|
|
replicas := suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Equal(replicas, []int64{int64(replicaID2)})
|
|
|
|
// test stopping balance order
|
|
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.Equal(replicas, []int64{int64(replicaID2)})
|
|
|
|
// mock collection row count
|
|
mockTargetManager.ExpectedCalls = nil
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid1), mock.Anything).Return(int64(200)).Maybe()
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, int64(cid2), mock.Anything).Return(int64(100)).Maybe()
|
|
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
|
|
// test normal balance order
|
|
replicas = suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Equal(replicas, []int64{int64(replicaID1)})
|
|
|
|
// test stopping balance order
|
|
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.Equal(replicas, []int64{int64(replicaID1)})
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestSortCollections() {
|
|
ctx := context.Background()
|
|
|
|
// Set up test collections
|
|
cid1, cid2, cid3 := int64(1), int64(2), int64(3)
|
|
|
|
// Mock the target manager for row count returns
|
|
mockTargetManager := meta.NewMockTargetManager(suite.T())
|
|
suite.checker.targetMgr = mockTargetManager
|
|
|
|
// Collection 1: Low ID, High row count
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid1, mock.Anything).Return(int64(300)).Maybe()
|
|
|
|
// Collection 2: Middle ID, Low row count
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid2, mock.Anything).Return(int64(100)).Maybe()
|
|
|
|
// Collection 3: High ID, Middle row count
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid3, mock.Anything).Return(int64(200)).Maybe()
|
|
|
|
collections := []int64{cid1, cid2, cid3}
|
|
|
|
// Test ByRowCount sorting (default)
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByRowCount")
|
|
sortedCollections := suite.checker.sortCollections(ctx, collections)
|
|
suite.Equal([]int64{cid1, cid3, cid2}, sortedCollections, "Collections should be sorted by row count (highest first)")
|
|
|
|
// Test ByCollectionID sorting
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByCollectionID")
|
|
sortedCollections = suite.checker.sortCollections(ctx, collections)
|
|
suite.Equal([]int64{cid1, cid2, cid3}, sortedCollections, "Collections should be sorted by collection ID (ascending)")
|
|
|
|
// Test with empty sort order (should default to ByRowCount)
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "")
|
|
sortedCollections = suite.checker.sortCollections(ctx, collections)
|
|
suite.Equal([]int64{cid1, cid3, cid2}, sortedCollections, "Should default to ByRowCount when sort order is empty")
|
|
|
|
// Test with invalid sort order (should default to ByRowCount)
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "InvalidOrder")
|
|
sortedCollections = suite.checker.sortCollections(ctx, collections)
|
|
suite.Equal([]int64{cid1, cid3, cid2}, sortedCollections, "Should default to ByRowCount when sort order is invalid")
|
|
|
|
// Test with mixed case (should be case-insensitive)
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "bYcOlLeCtIoNiD")
|
|
sortedCollections = suite.checker.sortCollections(ctx, collections)
|
|
suite.Equal([]int64{cid1, cid2, cid3}, sortedCollections, "Should handle case-insensitive sort order names")
|
|
|
|
// Test with empty collection list
|
|
emptyCollections := []int64{}
|
|
sortedCollections = suite.checker.sortCollections(ctx, emptyCollections)
|
|
suite.Equal([]int64{}, sortedCollections, "Should handle empty collection list")
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestSortCollectionsIntegration() {
|
|
ctx := context.Background()
|
|
|
|
// Set up test collections and nodes
|
|
nodeID1 := int64(1)
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID1,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
|
|
|
|
// Create two collections to ensure sorting is triggered
|
|
cid1, replicaID1 := int64(1), int64(101)
|
|
collection1 := utils.CreateTestCollection(cid1, int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1})
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
|
|
// Add a second collection with different characteristics
|
|
cid2, replicaID2 := int64(2), int64(102)
|
|
collection2 := utils.CreateTestCollection(cid2, int32(replicaID2))
|
|
collection2.Status = querypb.LoadStatus_Loaded
|
|
replica2 := utils.CreateTestReplica(replicaID2, cid2, []int64{nodeID1})
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
|
|
|
|
// Mock target manager
|
|
mockTargetManager := meta.NewMockTargetManager(suite.T())
|
|
suite.checker.targetMgr = mockTargetManager
|
|
|
|
// Setup different row counts to test sorting
|
|
// Collection 1 has more rows than Collection 2
|
|
var getRowCountCallCount int
|
|
mockTargetManager.On("GetCollectionRowCount", mock.Anything, mock.Anything, mock.Anything).
|
|
Return(func(ctx context.Context, collectionID int64, scope int32) int64 {
|
|
getRowCountCallCount++
|
|
if collectionID == cid1 {
|
|
return 200 // More rows in collection 1
|
|
}
|
|
return 100 // Fewer rows in collection 2
|
|
})
|
|
|
|
mockTargetManager.On("IsCurrentTargetReady", mock.Anything, mock.Anything).Return(true)
|
|
mockTargetManager.On("IsNextTargetExist", mock.Anything, mock.Anything).Return(true)
|
|
|
|
// Configure for testing
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByRowCount")
|
|
|
|
// Clear first to avoid previous test state
|
|
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
|
|
|
|
// Call normal balance
|
|
_ = suite.checker.getReplicaForNormalBalance(ctx)
|
|
|
|
// Verify GetCollectionRowCount was called at least twice (once for each collection)
|
|
// This confirms that the collections were sorted
|
|
suite.True(getRowCountCallCount >= 2, "GetCollectionRowCount should be called at least twice during normal balance")
|
|
|
|
// Reset counter and test stopping balance
|
|
getRowCountCallCount = 0
|
|
|
|
// Set up for stopping balance test
|
|
mr1 := replica1.CopyForWrite()
|
|
mr1.AddRONode(nodeID1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
|
|
|
mr2 := replica2.CopyForWrite()
|
|
mr2.AddRONode(nodeID1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
|
|
|
|
paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true")
|
|
|
|
// Call stopping balance
|
|
_ = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
|
|
// Verify GetCollectionRowCount was called at least twice during stopping balance
|
|
suite.True(getRowCountCallCount >= 2, "GetCollectionRowCount should be called at least twice during stopping balance")
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestBalanceTriggerOrder() {
|
|
ctx := context.Background()
|
|
|
|
// Set up nodes
|
|
nodeID1, nodeID2 := int64(1), int64(2)
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID1,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID2,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
|
|
|
|
// Create collections with different row counts
|
|
cid1, replicaID1 := int64(1), int64(101)
|
|
collection1 := utils.CreateTestCollection(cid1, int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1, nodeID2})
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
|
|
cid2, replicaID2 := int64(2), int64(102)
|
|
collection2 := utils.CreateTestCollection(cid2, int32(replicaID2))
|
|
collection2.Status = querypb.LoadStatus_Loaded
|
|
replica2 := utils.CreateTestReplica(replicaID2, cid2, []int64{nodeID1, nodeID2})
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
|
|
|
|
cid3, replicaID3 := int64(3), int64(103)
|
|
collection3 := utils.CreateTestCollection(cid3, int32(replicaID3))
|
|
collection3.Status = querypb.LoadStatus_Loaded
|
|
replica3 := utils.CreateTestReplica(replicaID3, cid3, []int64{nodeID1, nodeID2})
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection3)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica3)
|
|
|
|
// Mock the target manager
|
|
mockTargetManager := meta.NewMockTargetManager(suite.T())
|
|
suite.checker.targetMgr = mockTargetManager
|
|
|
|
// Set row counts: Collection 1 (highest), Collection 3 (middle), Collection 2 (lowest)
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid1, mock.Anything).Return(int64(300)).Maybe()
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid2, mock.Anything).Return(int64(100)).Maybe()
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, cid3, mock.Anything).Return(int64(200)).Maybe()
|
|
|
|
// Mark the current target as ready
|
|
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe()
|
|
|
|
// Enable auto balance
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
|
|
|
// Test with ByRowCount order (default)
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByRowCount")
|
|
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
|
|
|
|
// Normal balance should pick the collection with highest row count first
|
|
replicas := suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Contains(replicas, replicaID1, "Should balance collection with highest row count first")
|
|
|
|
// Add stopping nodes to test stopping balance
|
|
mr1 := replica1.CopyForWrite()
|
|
mr1.AddRONode(nodeID1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
|
|
|
mr2 := replica2.CopyForWrite()
|
|
mr2.AddRONode(nodeID1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
|
|
|
|
mr3 := replica3.CopyForWrite()
|
|
mr3.AddRONode(nodeID1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr3.IntoReplica())
|
|
|
|
// Enable stopping balance
|
|
paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true")
|
|
|
|
// Stopping balance should also pick the collection with highest row count first
|
|
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with highest row count")
|
|
|
|
// Test with ByCollectionID order
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceTriggerOrder.Key, "ByCollectionID")
|
|
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
|
|
|
|
// Normal balance should pick the collection with lowest ID first
|
|
replicas = suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Contains(replicas, replicaID1, "Should balance collection with lowest ID first")
|
|
|
|
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
|
// Stopping balance should also pick the collection with lowest ID first
|
|
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID")
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestHasUnbalancedCollectionFlag() {
|
|
ctx := context.Background()
|
|
|
|
// Set up nodes
|
|
nodeID1, nodeID2 := int64(1), int64(2)
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID1,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID2,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
|
|
|
|
// Create collection
|
|
cid1, replicaID1 := int64(1), int64(101)
|
|
collection1 := utils.CreateTestCollection(cid1, int32(replicaID1))
|
|
collection1.Status = querypb.LoadStatus_Loaded
|
|
replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1, nodeID2})
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
|
|
|
// Mock the target manager
|
|
mockTargetManager := meta.NewMockTargetManager(suite.T())
|
|
suite.checker.targetMgr = mockTargetManager
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(int64(100)).Maybe()
|
|
|
|
// 1. Test normal balance with auto balance disabled
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false")
|
|
|
|
// The collections set should be initially empty
|
|
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
|
|
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len())
|
|
|
|
// Get replicas - should return nil and keep the set empty
|
|
replicas := suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Empty(replicas)
|
|
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(),
|
|
"normalBalanceCollectionsCurrentRound should remain empty when auto balance is disabled")
|
|
|
|
// 2. Test normal balance when targetMgr.IsCurrentTargetReady returns false
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
|
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false).Maybe()
|
|
|
|
// The collections set should be initially empty
|
|
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
|
|
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len())
|
|
|
|
// Get replicas - should return nil and keep the set empty because of not ready targets
|
|
replicas = suite.checker.getReplicaForNormalBalance(ctx)
|
|
suite.Empty(replicas)
|
|
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(),
|
|
"normalBalanceCollectionsCurrentRound should remain empty when targets are not ready")
|
|
|
|
// 3. Test stopping balance when there are no RO nodes
|
|
paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true")
|
|
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe()
|
|
|
|
// The collections set should be initially empty
|
|
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
|
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
|
|
|
|
// Get replicas - should return nil and keep the set empty because there are no RO nodes
|
|
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.Empty(replicas)
|
|
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len(),
|
|
"stoppingBalanceCollectionsCurrentRound should remain empty when there are no RO nodes")
|
|
|
|
// 4. Test stopping balance with RO nodes
|
|
// Add a RO node to the replica
|
|
mr1 := replica1.CopyForWrite()
|
|
mr1.AddRONode(nodeID1)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
|
|
|
// The collections set should be initially empty
|
|
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
|
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
|
|
|
|
// Get replicas - should return the replica ID and add the collection to the set
|
|
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
|
suite.Equal([]int64{replicaID1}, replicas)
|
|
suite.Equal(1, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
|
|
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(cid1),
|
|
"stoppingBalanceCollectionsCurrentRound should contain the collection when it has RO nodes")
|
|
}
|
|
|
|
func (suite *BalanceCheckerTestSuite) TestCheckBatchSizesAndMultiCollection() {
|
|
ctx := context.Background()
|
|
|
|
// Set up nodes
|
|
nodeID1, nodeID2 := int64(1), int64(2)
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID1,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
|
NodeID: nodeID2,
|
|
Address: "localhost",
|
|
Hostname: "localhost",
|
|
}))
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
|
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
|
|
|
|
// Create 3 collections
|
|
for i := 1; i <= 3; i++ {
|
|
cid := int64(i)
|
|
replicaID := int64(100 + i)
|
|
|
|
collection := utils.CreateTestCollection(cid, int32(replicaID))
|
|
collection.Status = querypb.LoadStatus_Loaded
|
|
replica := utils.CreateTestReplica(replicaID, cid, []int64{})
|
|
mutableReplica := replica.CopyForWrite()
|
|
mutableReplica.AddRWNode(nodeID1)
|
|
mutableReplica.AddRONode(nodeID2)
|
|
|
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection)
|
|
suite.checker.meta.ReplicaManager.Put(ctx, mutableReplica.IntoReplica())
|
|
}
|
|
|
|
// Mock target manager
|
|
mockTargetManager := meta.NewMockTargetManager(suite.T())
|
|
suite.checker.targetMgr = mockTargetManager
|
|
|
|
// All collections have same row count for simplicity
|
|
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(int64(100)).Maybe()
|
|
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
|
mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe()
|
|
|
|
// For each collection, return different segment plans
|
|
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.AnythingOfType("*meta.Replica")).RunAndReturn(
|
|
func(ctx context.Context, replica *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
|
|
// Create 2 segment plans and 1 channel plan per replica
|
|
collID := replica.GetCollectionID()
|
|
segPlans := make([]balance.SegmentAssignPlan, 0)
|
|
chanPlans := make([]balance.ChannelAssignPlan, 0)
|
|
|
|
// Create 2 segment plans
|
|
for j := 1; j <= 2; j++ {
|
|
segID := collID*100 + int64(j)
|
|
segPlan := balance.SegmentAssignPlan{
|
|
Segment: utils.CreateTestSegment(segID, collID, 1, 1, 1, "test-channel"),
|
|
Replica: replica,
|
|
From: nodeID1,
|
|
To: nodeID2,
|
|
}
|
|
segPlans = append(segPlans, segPlan)
|
|
}
|
|
|
|
// Create 1 channel plan
|
|
chanPlan := balance.ChannelAssignPlan{
|
|
Channel: &meta.DmChannel{
|
|
VchannelInfo: &datapb.VchannelInfo{
|
|
CollectionID: collID,
|
|
ChannelName: "test-channel",
|
|
},
|
|
},
|
|
Replica: replica,
|
|
From: nodeID1,
|
|
To: nodeID2,
|
|
}
|
|
chanPlans = append(chanPlans, chanPlan)
|
|
|
|
return segPlans, chanPlans
|
|
}).Maybe()
|
|
|
|
// Add tasks to check batch size limits
|
|
var addedTasks []task.Task
|
|
suite.scheduler.ExpectedCalls = nil
|
|
suite.scheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
|
|
addedTasks = append(addedTasks, t)
|
|
return nil
|
|
}).Maybe()
|
|
|
|
// Test 1: Balance with multiple collections disabled
|
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
|
paramtable.Get().Save(Params.QueryCoordCfg.EnableBalanceOnMultipleCollections.Key, "false")
|
|
// Set batch sizes to large values to test single-collection case
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceSegmentBatchSize.Key, "10")
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceChannelBatchSize.Key, "10")
|
|
|
|
// Reset test state
|
|
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
|
suite.checker.autoBalanceTs = time.Time{} // Reset to trigger auto balance
|
|
addedTasks = nil
|
|
|
|
// Run the Check method
|
|
suite.checker.Check(ctx)
|
|
|
|
// Should have tasks for a single collection (2 segment tasks + 1 channel task)
|
|
suite.Equal(3, len(addedTasks), "Should have tasks for a single collection when multiple collections balance is disabled")
|
|
|
|
// Test 2: Balance with multiple collections enabled
|
|
paramtable.Get().Save(Params.QueryCoordCfg.EnableBalanceOnMultipleCollections.Key, "true")
|
|
|
|
// Reset test state
|
|
suite.checker.autoBalanceTs = time.Time{}
|
|
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
|
addedTasks = nil
|
|
|
|
// Run the Check method
|
|
suite.checker.Check(ctx)
|
|
|
|
// Should have tasks for all collections (3 collections * (2 segment tasks + 1 channel task) = 9 tasks)
|
|
suite.Equal(9, len(addedTasks), "Should have tasks for all collections when multiple collections balance is enabled")
|
|
|
|
// Test 3: Batch size limits
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceSegmentBatchSize.Key, "2")
|
|
paramtable.Get().Save(Params.QueryCoordCfg.BalanceChannelBatchSize.Key, "1")
|
|
|
|
// Reset test state
|
|
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
|
addedTasks = nil
|
|
|
|
// Run the Check method
|
|
suite.checker.Check(ctx)
|
|
|
|
// Should respect batch size limits: 2 segment tasks + 1 channel task = 3 tasks
|
|
suite.Equal(3, len(addedTasks), "Should respect batch size limits")
|
|
|
|
// Count segment tasks and channel tasks
|
|
segmentTaskCount := 0
|
|
channelTaskCount := 0
|
|
for _, t := range addedTasks {
|
|
if _, ok := t.(*task.SegmentTask); ok {
|
|
segmentTaskCount++
|
|
} else {
|
|
channelTaskCount++
|
|
}
|
|
}
|
|
|
|
suite.LessOrEqual(segmentTaskCount, 2, "Should have at most 2 segment tasks due to batch size limit")
|
|
suite.LessOrEqual(channelTaskCount, 1, "Should have at most 1 channel task due to batch size limit")
|
|
}
|
|
|
|
func TestBalanceCheckerSuite(t *testing.T) {
|
|
suite.Run(t, new(BalanceCheckerTestSuite))
|
|
}
|