From 043ac87be078dfd92909f14683d410810a16088f Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 1 Dec 2023 10:08:34 +0800 Subject: [PATCH] fix: Balance channel may cause channel not availble error (#28829) issue: #28831 release old delegator before new delegator update it's distribution may cause `channel not availble` error This PR will block release old delgator before new delegator finish `syncDistribution` Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 40 ++++++++++++ internal/querycoordv2/task/task_test.go | 86 +++++++++++++++++++++++++ 2 files changed, 126 insertions(+) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 9b23359bf5..f8e2dab0b2 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -614,12 +614,52 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { // return true if the task should be executed, // false otherwise func (scheduler *taskScheduler) preProcess(task Task) bool { + log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With( + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("taskID", task.ID()), + ) if task.Status() != TaskStatusStarted { return false } + // check if new delegator is ready to release old delegator + checkLeaderView := func(collectionID int64, channel string, node int64) bool { + segmentsInTarget := scheduler.targetMgr.GetSealedSegmentsByChannel(collectionID, channel, meta.CurrentTarget) + leader := scheduler.distMgr.LeaderViewManager.GetLeaderShardView(node, channel) + if leader == nil { + return false + } + + for segmentID := range segmentsInTarget { + if _, exist := leader.Segments[segmentID]; !exist { + return false + } + } + + return true + } + actions, step := task.Actions(), task.Step() for step < len(actions) && actions[step].IsFinished(scheduler.distMgr) { + if GetTaskType(task) == TaskTypeMove && actions[step].Type() == ActionTypeGrow { + var ready bool + switch actions[step].(type) { + case *ChannelAction: + // if balance channel task has finished grow action, block reduce action until + // segment distribution has been sync to new delegator, cause new delegator may + // causes a few time to load delta log, if reduce the old delegator in advance, + // new delegator can't service search and query, will got no available channel error + channelAction := actions[step].(*ChannelAction) + ready = checkLeaderView(task.CollectionID(), channelAction.Shard(), channelAction.Node()) + default: + ready = true + } + + if !ready { + log.RatedInfo(30, "Blocking reduce action in balance channel task") + break + } + } task.StepUp() step++ } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index f2aef18d26..1d25c280f4 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1428,6 +1428,92 @@ func createReplica(collection int64, nodes ...int64) *meta.Replica { ) } +func (suite *TaskSuite) TestBalanceChannelTask() { + collectionID := int64(1) + partitionID := int64(1) + channel := "channel-1" + vchannel := &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: channel, + } + + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + }, + { + ID: 2, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + }, + { + ID: 3, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + }, + } + suite.meta.PutCollection(utils.CreateTestCollection(collectionID, 1), utils.CreateTestPartition(collectionID, 1)) + suite.broker.ExpectedCalls = nil + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return([]*datapb.VchannelInfo{vchannel}, segments, nil) + suite.target.UpdateCollectionNextTarget(collectionID) + suite.target.UpdateCollectionCurrentTarget(collectionID) + suite.target.UpdateCollectionNextTarget(collectionID) + + suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{ + ID: 2, + CollectionID: collectionID, + Channel: channel, + Segments: map[int64]*querypb.SegmentDist{ + 1: {}, + 2: {}, + 3: {}, + }, + }) + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: channel, + }) + task, err := NewChannelTask(context.Background(), + 10*time.Second, + WrapIDSource(2), + collectionID, + 1, + NewChannelAction(1, ActionTypeGrow, channel), + NewChannelAction(2, ActionTypeReduce, channel), + ) + suite.NoError(err) + + // new delegator distribution hasn't updated, block balance + suite.scheduler.preProcess(task) + suite.Equal(0, task.step) + + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: channel, + Segments: map[int64]*querypb.SegmentDist{ + 1: {}, + 2: {}, + 3: {}, + }, + }) + + // new delegator distribution updated, task step up + suite.scheduler.preProcess(task) + suite.Equal(1, task.step) + + suite.dist.LeaderViewManager.Update(2) + // old delegator removed + suite.scheduler.preProcess(task) + suite.Equal(2, task.step) +} + func TestTask(t *testing.T) { suite.Run(t, new(TaskSuite)) }