From ebbccb870cf7696cce7839b6d30b4853e0b2f3b0 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 23 Jul 2024 11:06:15 +0800 Subject: [PATCH] fix: Avoid segment lack caused by deduplicate segment task (#34782) (#34903) issue: #34781 pr: #34782 when balance segment hasn't finished yet, query coord may found 2 loaded copy of segment, then it will generate task to deduplicate, which may cancel the balance task. then the old copy has been released, and the new copy hasn't be ready yet but canceled, then search failed by segment lack. this PR set deduplicate segment task's proirity to low, to avoid balance segment task canceled by deduplicate task. Signed-off-by: Wei Liu --- .../querycoordv2/checkers/segment_checker.go | 8 ++++++- .../checkers/segment_checker_test.go | 2 +- internal/querycoordv2/handlers.go | 24 +++++++++++-------- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index d669758790..1fa04b3eaa 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -101,8 +101,9 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { released := utils.FilterReleased(segments, collectionIDs) reduceTasks := c.createSegmentReduceTasks(ctx, released, meta.NilReplica, querypb.DataScope_Historical) task.SetReason("collection released", reduceTasks...) + task.SetPriority(task.TaskPriorityNormal, reduceTasks...) results = append(results, reduceTasks...) - task.SetPriority(task.TaskPriorityNormal, results...) + return results } @@ -114,11 +115,13 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica // loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan) tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica) task.SetReason("lacks of segment", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) redundancies = c.filterSegmentInUse(replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("segment not exists in target", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) // compare inner dists to find repeated loaded segments @@ -126,12 +129,15 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica redundancies = c.filterExistedOnLeader(replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("redundancies of segment", tasks...) + // set deduplicate task priority to low, to avoid deduplicate task cancel balance task + task.SetPriority(task.TaskPriorityLow, tasks...) ret = append(ret, tasks...) // compare with target to find the lack and redundancy of segments _, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID()) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Streaming) task.SetReason("streaming segment not exists in target", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) return ret diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 3d43a40376..7418889307 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -448,7 +448,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() { suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(1, action.SegmentID()) suite.EqualValues(1, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) + suite.Equal(tasks[0].Priority(), task.TaskPriorityLow) // test less version exist on leader checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{})) diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 13fa55008d..5aa17b0aa1 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -122,7 +122,7 @@ func (s *Server) balanceSegments(ctx context.Context, actions = append(actions, releaseAction) } - task, err := task.NewSegmentTask(s.ctx, + t, err := task.NewSegmentTask(s.ctx, Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), utils.ManualBalance, collectionID, @@ -140,13 +140,15 @@ func (s *Server) balanceSegments(ctx context.Context, ) continue } - task.SetReason("manual balance") - err = s.taskScheduler.Add(task) + t.SetReason("manual balance") + // set manual balance to normal, to avoid manual balance be canceled by other segment task + t.SetPriority(task.TaskPriorityNormal) + err = s.taskScheduler.Add(t) if err != nil { - task.Cancel(err) + t.Cancel(err) return err } - tasks = append(tasks, task) + tasks = append(tasks, t) } if sync { @@ -198,7 +200,7 @@ func (s *Server) balanceChannels(ctx context.Context, releaseAction := task.NewChannelAction(plan.From, task.ActionTypeReduce, plan.Channel.GetChannelName()) actions = append(actions, releaseAction) } - task, err := task.NewChannelTask(s.ctx, + t, err := task.NewChannelTask(s.ctx, Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), utils.ManualBalance, collectionID, @@ -215,13 +217,15 @@ func (s *Server) balanceChannels(ctx context.Context, ) continue } - task.SetReason("manual balance") - err = s.taskScheduler.Add(task) + t.SetReason("manual balance") + // set manual balance channel to high, to avoid manual balance be canceled by other channel task + t.SetPriority(task.TaskPriorityHigh) + err = s.taskScheduler.Add(t) if err != nil { - task.Cancel(err) + t.Cancel(err) return err } - tasks = append(tasks, task) + tasks = append(tasks, t) } if sync {