From 06b191b1649a3590f38bdfbbecc6c80ca91847ad Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 13 Mar 2024 15:05:04 +0800 Subject: [PATCH] fix: Balance channel stuck forever due to logic dead lock (#31202) issue: #30816 cause balance channel will stuck until leader view catch up the current target, then start to unsub the old delegator. which make sure that the new delegator can provide search before release old delegator. but another logic in segment_checker skip loading segment during balance channel. so during balance channel, if query node crash, new delegator can't catch up target forever, then stuck forever. This PR remove the rule that skip loading segment during balance channel to avoid the logic dead lock here. Signed-off-by: Wei Liu --- .../balance/multi_target_balance.go | 5 ++ .../balance/rowcount_based_balancer.go | 5 ++ .../balance/score_based_balancer.go | 5 ++ .../querycoordv2/checkers/balance_checker.go | 39 ++++----- .../querycoordv2/checkers/segment_checker.go | 14 ---- .../checkers/segment_checker_test.go | 40 ---------- pkg/util/paramtable/component_param.go | 10 +++ pkg/util/paramtable/component_param_test.go | 1 + tests/integration/balance/balance_test.go | 80 +++++++++++++++++++ 9 files changed, 127 insertions(+), 72 deletions(-) diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index 82f09d48e6..8ed1973c8c 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -493,6 +493,11 @@ func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAs // print current distribution before generating plans segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(offlineNodes) != 0 { + if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes)) + return nil, nil + } + log.Info("Handle stopping nodes", zap.Any("stopping nodes", offlineNodes), zap.Any("available nodes", onlineNodes), diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 799a51a3f7..1dfc1aa115 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -179,6 +179,11 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(offlineNodes) != 0 { + if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes)) + return nil, nil + } + log.Info("Handle stopping nodes", zap.Any("stopping nodes", offlineNodes), zap.Any("available nodes", onlineNodes), diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 416092069c..8369728e3d 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -213,6 +213,11 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss // print current distribution before generating plans segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(offlineNodes) != 0 { + if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes)) + return nil, nil + } + log.Info("Handle stopping nodes", zap.Any("stopping nodes", offlineNodes), zap.Any("available nodes", onlineNodes), diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 8b710ecdb3..2aa55a20c8 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -90,27 +91,29 @@ func (b *BalanceChecker) replicasToBalance() []int64 { return loadedCollections[i] < loadedCollections[j] }) - // balance collections influenced by stopping nodes - stoppingReplicas := make([]int64, 0) - for _, cid := range loadedCollections { - // if target and meta isn't ready, skip balance this collection - if !b.readyToCheck(cid) { - continue - } - replicas := b.meta.ReplicaManager.GetByCollection(cid) - for _, replica := range replicas { - for _, nodeID := range replica.GetNodes() { - isStopping, _ := b.nodeManager.IsStoppingNode(nodeID) - if isStopping { - stoppingReplicas = append(stoppingReplicas, replica.GetID()) - break + if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + // balance collections influenced by stopping nodes + stoppingReplicas := make([]int64, 0) + for _, cid := range loadedCollections { + // if target and meta isn't ready, skip balance this collection + if !b.readyToCheck(cid) { + continue + } + replicas := b.meta.ReplicaManager.GetByCollection(cid) + for _, replica := range replicas { + for _, nodeID := range replica.GetNodes() { + isStopping, _ := b.nodeManager.IsStoppingNode(nodeID) + if isStopping { + stoppingReplicas = append(stoppingReplicas, replica.GetID()) + break + } } } } - } - // do stopping balance only in this round - if len(stoppingReplicas) > 0 { - return stoppingReplicas + // do stopping balance only in this round + if len(stoppingReplicas) > 0 { + return stoppingReplicas + } } // no stopping balance and auto balance is disabled, return empty collections for balance diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index c629be0817..3b980cc806 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -105,22 +105,8 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { } func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task { - log := log.Ctx(ctx).WithRateGroup("qcv2.SegmentChecker", 1, 60).With( - zap.Int64("collectionID", replica.CollectionID), - zap.Int64("replicaID", replica.ID)) ret := make([]task.Task, 0) - // get channel dist by replica (ch -> node list), cause more then one delegator may exists during channel balance. - // if more than one delegator exist, load/release segment may causes chaos, so we can skip it until channel balance finished. - dist := c.dist.ChannelDistManager.GetChannelDistByReplica(replica) - for ch, nodes := range dist { - if len(nodes) > 1 { - log.Info("skip check segment due to two shard leader exists", - zap.String("channelName", ch)) - return ret - } - } - // compare with targets to find the lack and redundancy of segments lacks, redundancies := c.getSealedSegmentDiff(replica.GetCollectionID(), replica.GetID()) // loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan) diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index fe3928ebe9..21bd7e458a 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -196,46 +196,6 @@ func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { suite.Len(tasks, 0) } -func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { - checker := suite.checker - // set meta - checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) - checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) - - // set target - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(int64(1)) - - // set dist - checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel")) - checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel")) - checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 11, 1, 1, "test-insert-channel")) - checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) - - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 0) -} - func (suite *SegmentCheckerTestSuite) TestReleaseSegments() { checker := suite.checker // set meta diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2de2f87ddf..3609b6b975 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1467,6 +1467,7 @@ type queryCoordConfig struct { CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` CheckNodeSessionInterval ParamItem `refreshable:"false"` GracefulStopTimeout ParamItem `refreshable:"true"` + EnableStoppingBalance ParamItem `refreshable:"true"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1934,6 +1935,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.GracefulStopTimeout.Init(base.mgr) + + p.EnableStoppingBalance = ParamItem{ + Key: "queryCoord.enableStoppingBalance", + Version: "2.3.13", + DefaultValue: "true", + Doc: "whether enable stopping balance", + Export: true, + } + p.EnableStoppingBalance.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 17d68f8325..468a578c2e 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -294,6 +294,7 @@ func TestComponentParam(t *testing.T) { params.Save("queryCoord.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + assert.Equal(t, true, Params.EnableStoppingBalance.GetAsBool()) }) t.Run("test queryNodeConfig", func(t *testing.T) { diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index c93b9f4531..893e4644a6 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -233,6 +233,86 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { }, 10*time.Second, 1*time.Second) } +func (s *BalanceTestSuit) TestNodeDown() { + ctx := context.Background() + + // disable compact + s.Cluster.DataCoord.GcControl(ctx, &datapb.GcControlRequest{ + Base: commonpbutil.NewMsgBase(), + Command: datapb.GcCommand_Pause, + Params: []*commonpb.KeyValuePair{ + {Key: "duration", Value: "3600"}, + }, + }) + defer s.Cluster.DataCoord.GcControl(ctx, &datapb.GcControlRequest{ + Base: commonpbutil.NewMsgBase(), + Command: datapb.GcCommand_Resume, + }) + + // disable balance channel + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key, "false") + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.EnableStoppingBalance.Key, "false") + + // init collection with 3 channel, each channel has 15 segment, each segment has 2000 row + // and load it with 2 replicas on 2 nodes. + name := "test_balance_" + funcutil.GenRandomStr() + s.initCollection(name, 1, 2, 15, 2000) + + // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments + qn1 := s.Cluster.AddQueryNode() + qn2 := s.Cluster.AddQueryNode() + + // check segment num on new query node + s.Eventually(func() bool { + resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) + return len(resp.Channels) == 0 && len(resp.Segments) == 10 + }, 30*time.Second, 1*time.Second) + + s.Eventually(func() bool { + resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) + return len(resp.Channels) == 0 && len(resp.Segments) == 10 + }, 30*time.Second, 1*time.Second) + + // then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2 + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key) + time.Sleep(1 * time.Second) + qn1.Stop() + + info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: commonpbutil.NewMsgBase(), + CollectionName: name, + }) + s.NoError(err) + s.True(merr.Ok(info.GetStatus())) + collectionID := info.GetCollectionID() + + // expected channel and segment concurrent move to qn2 + s.Eventually(func() bool { + resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) + return len(resp.Channels) == 1 && len(resp.Segments) == 15 + }, 30*time.Second, 1*time.Second) + + // expect all delegator will recover to healthy + s.Eventually(func() bool { + resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{ + Base: commonpbutil.NewMsgBase(), + CollectionID: collectionID, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + return len(resp.Shards) == 2 + }, 30*time.Second, 1*time.Second) +} + func TestBalance(t *testing.T) { suite.Run(t, new(BalanceTestSuit)) }