fix: Balance channel stuck forever due to logic dead lock (#31202)

issue: #30816

cause balance channel will stuck until leader view catch up the current
target, then start to unsub the old delegator. which make sure that the
new delegator can provide search before release old delegator. but
another logic in segment_checker skip loading segment during balance
channel. so during balance channel, if query node crash, new delegator
can't catch up target forever, then stuck forever.

This PR remove the rule that skip loading segment during balance channel
to avoid the logic dead lock here.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/31192/head
wei liu 2024-03-13 15:05:04 +08:00 committed by GitHub
parent 5b4c0bdc20
commit 06b191b164
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 127 additions and 72 deletions

View File

@ -493,6 +493,11 @@ func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAs
// print current distribution before generating plans
segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(offlineNodes) != 0 {
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes))
return nil, nil
}
log.Info("Handle stopping nodes",
zap.Any("stopping nodes", offlineNodes),
zap.Any("available nodes", onlineNodes),

View File

@ -179,6 +179,11 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment
segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(offlineNodes) != 0 {
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes))
return nil, nil
}
log.Info("Handle stopping nodes",
zap.Any("stopping nodes", offlineNodes),
zap.Any("available nodes", onlineNodes),

View File

@ -213,6 +213,11 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
// print current distribution before generating plans
segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(offlineNodes) != 0 {
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes))
return nil, nil
}
log.Info("Handle stopping nodes",
zap.Any("stopping nodes", offlineNodes),
zap.Any("available nodes", onlineNodes),

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -90,27 +91,29 @@ func (b *BalanceChecker) replicasToBalance() []int64 {
return loadedCollections[i] < loadedCollections[j]
})
// balance collections influenced by stopping nodes
stoppingReplicas := make([]int64, 0)
for _, cid := range loadedCollections {
// if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(cid) {
continue
}
replicas := b.meta.ReplicaManager.GetByCollection(cid)
for _, replica := range replicas {
for _, nodeID := range replica.GetNodes() {
isStopping, _ := b.nodeManager.IsStoppingNode(nodeID)
if isStopping {
stoppingReplicas = append(stoppingReplicas, replica.GetID())
break
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
// balance collections influenced by stopping nodes
stoppingReplicas := make([]int64, 0)
for _, cid := range loadedCollections {
// if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(cid) {
continue
}
replicas := b.meta.ReplicaManager.GetByCollection(cid)
for _, replica := range replicas {
for _, nodeID := range replica.GetNodes() {
isStopping, _ := b.nodeManager.IsStoppingNode(nodeID)
if isStopping {
stoppingReplicas = append(stoppingReplicas, replica.GetID())
break
}
}
}
}
}
// do stopping balance only in this round
if len(stoppingReplicas) > 0 {
return stoppingReplicas
// do stopping balance only in this round
if len(stoppingReplicas) > 0 {
return stoppingReplicas
}
}
// no stopping balance and auto balance is disabled, return empty collections for balance

View File

@ -105,22 +105,8 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
}
func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task {
log := log.Ctx(ctx).WithRateGroup("qcv2.SegmentChecker", 1, 60).With(
zap.Int64("collectionID", replica.CollectionID),
zap.Int64("replicaID", replica.ID))
ret := make([]task.Task, 0)
// get channel dist by replica (ch -> node list), cause more then one delegator may exists during channel balance.
// if more than one delegator exist, load/release segment may causes chaos, so we can skip it until channel balance finished.
dist := c.dist.ChannelDistManager.GetChannelDistByReplica(replica)
for ch, nodes := range dist {
if len(nodes) > 1 {
log.Info("skip check segment due to two shard leader exists",
zap.String("channelName", ch))
return ret
}
}
// compare with targets to find the lack and redundancy of segments
lacks, redundancies := c.getSealedSegmentDiff(replica.GetCollectionID(), replica.GetID())
// loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan)

View File

@ -196,46 +196,6 @@ func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() {
suite.Len(tasks, 0)
}
func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() {
checker := suite.checker
// set meta
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
// set target
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
// set dist
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel"))
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel"))
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 11, 1, 1, "test-insert-channel"))
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := checker.Check(context.TODO())
suite.Len(tasks, 0)
}
func (suite *SegmentCheckerTestSuite) TestReleaseSegments() {
checker := suite.checker
// set meta

View File

@ -1467,6 +1467,7 @@ type queryCoordConfig struct {
CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"`
CheckNodeSessionInterval ParamItem `refreshable:"false"`
GracefulStopTimeout ParamItem `refreshable:"true"`
EnableStoppingBalance ParamItem `refreshable:"true"`
}
func (p *queryCoordConfig) init(base *BaseTable) {
@ -1934,6 +1935,15 @@ func (p *queryCoordConfig) init(base *BaseTable) {
Export: true,
}
p.GracefulStopTimeout.Init(base.mgr)
p.EnableStoppingBalance = ParamItem{
Key: "queryCoord.enableStoppingBalance",
Version: "2.3.13",
DefaultValue: "true",
Doc: "whether enable stopping balance",
Export: true,
}
p.EnableStoppingBalance.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -294,6 +294,7 @@ func TestComponentParam(t *testing.T) {
params.Save("queryCoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
assert.Equal(t, true, Params.EnableStoppingBalance.GetAsBool())
})
t.Run("test queryNodeConfig", func(t *testing.T) {

View File

@ -233,6 +233,86 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
}, 10*time.Second, 1*time.Second)
}
func (s *BalanceTestSuit) TestNodeDown() {
ctx := context.Background()
// disable compact
s.Cluster.DataCoord.GcControl(ctx, &datapb.GcControlRequest{
Base: commonpbutil.NewMsgBase(),
Command: datapb.GcCommand_Pause,
Params: []*commonpb.KeyValuePair{
{Key: "duration", Value: "3600"},
},
})
defer s.Cluster.DataCoord.GcControl(ctx, &datapb.GcControlRequest{
Base: commonpbutil.NewMsgBase(),
Command: datapb.GcCommand_Resume,
})
// disable balance channel
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key, "false")
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.EnableStoppingBalance.Key, "false")
// init collection with 3 channel, each channel has 15 segment, each segment has 2000 row
// and load it with 2 replicas on 2 nodes.
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 2, 15, 2000)
// then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments
qn1 := s.Cluster.AddQueryNode()
qn2 := s.Cluster.AddQueryNode()
// check segment num on new query node
s.Eventually(func() bool {
resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 0 && len(resp.Segments) == 10
}, 30*time.Second, 1*time.Second)
s.Eventually(func() bool {
resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 0 && len(resp.Segments) == 10
}, 30*time.Second, 1*time.Second)
// then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key)
time.Sleep(1 * time.Second)
qn1.Stop()
info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(),
CollectionName: name,
})
s.NoError(err)
s.True(merr.Ok(info.GetStatus()))
collectionID := info.GetCollectionID()
// expected channel and segment concurrent move to qn2
s.Eventually(func() bool {
resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 1 && len(resp.Segments) == 15
}, 30*time.Second, 1*time.Second)
// expect all delegator will recover to healthy
s.Eventually(func() bool {
resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{
Base: commonpbutil.NewMsgBase(),
CollectionID: collectionID,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
return len(resp.Shards) == 2
}, 30*time.Second, 1*time.Second)
}
func TestBalance(t *testing.T) {
suite.Run(t, new(BalanceTestSuit))
}