fix: Skip generate balance task when target not ready (#30724)

issue: #30723

This PR skip generate balance task when collection's target isn't ready.
also refine the check stale logic in query coord's scheduler, if channel
exist in current or next target, task won't be canceled.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/30778/head
wei liu 2024-02-23 10:32:53 +08:00 committed by GitHub
parent 950624d8d3
commit 6dd7297178
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 150 additions and 16 deletions

View File

@ -43,13 +43,20 @@ type BalanceChecker struct {
nodeManager *session.NodeManager
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
scheduler task.Scheduler
targetMgr *meta.TargetManager
}
func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *session.NodeManager, scheduler task.Scheduler) *BalanceChecker {
func NewBalanceChecker(meta *meta.Meta,
targetMgr *meta.TargetManager,
balancer balance.Balance,
nodeMgr *session.NodeManager,
scheduler task.Scheduler,
) *BalanceChecker {
return &BalanceChecker{
checkerActivation: newCheckerActivation(),
Balance: balancer,
meta: meta,
targetMgr: targetMgr,
nodeManager: nodeMgr,
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
scheduler: scheduler,
@ -64,6 +71,13 @@ func (b *BalanceChecker) Description() string {
return "BalanceChecker checks the cluster distribution and generates balance tasks"
}
func (b *BalanceChecker) readyToCheck(collectionID int64) bool {
metaExist := (b.meta.GetCollection(collectionID) != nil)
targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID)
return metaExist && targetExist
}
func (b *BalanceChecker) replicasToBalance() []int64 {
ids := b.meta.GetAll()
@ -79,6 +93,10 @@ func (b *BalanceChecker) replicasToBalance() []int64 {
// balance collections influenced by stopping nodes
stoppingReplicas := make([]int64, 0)
for _, cid := range loadedCollections {
// if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(cid) {
continue
}
replicas := b.meta.ReplicaManager.GetByCollection(cid)
for _, replica := range replicas {
for _, nodeID := range replica.GetNodes() {

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -46,6 +47,7 @@ type BalanceCheckerTestSuite struct {
broker *meta.MockBroker
nodeMgr *session.NodeManager
scheduler *task.MockScheduler
targetMgr *meta.TargetManager
}
func (suite *BalanceCheckerTestSuite) SetupSuite() {
@ -73,9 +75,10 @@ func (suite *BalanceCheckerTestSuite) SetupTest() {
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.balancer = balance.NewMockBalancer(suite.T())
suite.checker = NewBalanceChecker(suite.meta, suite.balancer, suite.nodeMgr, suite.scheduler)
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.balancer, suite.nodeMgr, suite.scheduler)
}
func (suite *BalanceCheckerTestSuite) TearDownTest() {
@ -91,19 +94,41 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
// set collections meta
cid1, replicaID1 := 1, 1
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
suite.checker.meta.CollectionManager.PutCollection(collection1)
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(collection1, partition1)
suite.checker.meta.ReplicaManager.Put(replica1)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid1))
cid2, replicaID2 := 2, 2
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
suite.checker.meta.CollectionManager.PutCollection(collection2)
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(collection2, partition2)
suite.checker.meta.ReplicaManager.Put(replica2)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid2))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid2))
// test disable auto balance
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false")
@ -137,20 +162,41 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1 := 1, 1
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
suite.checker.meta.CollectionManager.PutCollection(collection1)
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(collection1, partition1)
suite.checker.meta.ReplicaManager.Put(replica1)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid1))
cid2, replicaID2 := 2, 2
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
suite.checker.meta.CollectionManager.PutCollection(collection2)
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(collection2, partition2)
suite.checker.meta.ReplicaManager.Put(replica2)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid2))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid2))
// test scheduler busy
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
@ -172,20 +218,41 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1 := 1, 1
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
suite.checker.meta.CollectionManager.PutCollection(collection1)
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(collection1, partition1)
suite.checker.meta.ReplicaManager.Put(replica1)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid1))
cid2, replicaID2 := 2, 2
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
suite.checker.meta.CollectionManager.PutCollection(collection2)
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(collection2, partition2)
suite.checker.meta.ReplicaManager.Put(replica2)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid2))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid2))
// test stopping balance
idsToBalance := []int64{int64(replicaID1), int64(replicaID2)}
@ -206,6 +273,55 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
suite.Len(tasks, 2)
}
func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
// set up nodes info, stopping node1
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost"))
suite.nodeMgr.Stopping(int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(collection1, partition1)
suite.checker.meta.ReplicaManager.Put(replica1)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid1))
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
collection2.Status = querypb.LoadStatus_Loaded
replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)})
partition2 := utils.CreateTestPartition(int64(cid2), int64(partitionID2))
suite.checker.meta.CollectionManager.PutCollection(collection2, partition2)
suite.checker.meta.ReplicaManager.Put(replica2)
// test stopping balance
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.replicasToBalance()
suite.ElementsMatch(idsToBalance, replicasToBalance)
}
func TestBalanceCheckerSuite(t *testing.T) {
suite.Run(t, new(BalanceCheckerTestSuite))
}

View File

@ -65,7 +65,7 @@ func NewCheckerController(
checkers := map[utils.CheckerType]Checker{
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer, nodeMgr),
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
utils.BalanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler),
utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, balancer, nodeMgr, scheduler),
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr),
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr),
}

View File

@ -892,7 +892,7 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error {
for _, action := range task.Actions() {
switch action.Type() {
case ActionTypeGrow:
if scheduler.targetMgr.GetDmChannel(task.collectionID, task.Channel(), meta.NextTarget) == nil {
if scheduler.targetMgr.GetDmChannel(task.collectionID, task.Channel(), meta.NextTargetFirst) == nil {
log.Warn("the task is stale, the channel to subscribe not exists in targets",
zap.String("channel", task.Channel()))
return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel")