diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index d669758790..1fa04b3eaa 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -101,8 +101,9 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { released := utils.FilterReleased(segments, collectionIDs) reduceTasks := c.createSegmentReduceTasks(ctx, released, meta.NilReplica, querypb.DataScope_Historical) task.SetReason("collection released", reduceTasks...) + task.SetPriority(task.TaskPriorityNormal, reduceTasks...) results = append(results, reduceTasks...) - task.SetPriority(task.TaskPriorityNormal, results...) + return results } @@ -114,11 +115,13 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica // loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan) tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica) task.SetReason("lacks of segment", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) redundancies = c.filterSegmentInUse(replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("segment not exists in target", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) // compare inner dists to find repeated loaded segments @@ -126,12 +129,15 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica redundancies = c.filterExistedOnLeader(replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("redundancies of segment", tasks...) + // set deduplicate task priority to low, to avoid deduplicate task cancel balance task + task.SetPriority(task.TaskPriorityLow, tasks...) ret = append(ret, tasks...) // compare with target to find the lack and redundancy of segments _, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID()) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Streaming) task.SetReason("streaming segment not exists in target", tasks...) + task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) return ret diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 3d43a40376..7418889307 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -448,7 +448,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() { suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(1, action.SegmentID()) suite.EqualValues(1, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) + suite.Equal(tasks[0].Priority(), task.TaskPriorityLow) // test less version exist on leader checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{})) diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 13fa55008d..5aa17b0aa1 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -122,7 +122,7 @@ func (s *Server) balanceSegments(ctx context.Context, actions = append(actions, releaseAction) } - task, err := task.NewSegmentTask(s.ctx, + t, err := task.NewSegmentTask(s.ctx, Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), utils.ManualBalance, collectionID, @@ -140,13 +140,15 @@ func (s *Server) balanceSegments(ctx context.Context, ) continue } - task.SetReason("manual balance") - err = s.taskScheduler.Add(task) + t.SetReason("manual balance") + // set manual balance to normal, to avoid manual balance be canceled by other segment task + t.SetPriority(task.TaskPriorityNormal) + err = s.taskScheduler.Add(t) if err != nil { - task.Cancel(err) + t.Cancel(err) return err } - tasks = append(tasks, task) + tasks = append(tasks, t) } if sync { @@ -198,7 +200,7 @@ func (s *Server) balanceChannels(ctx context.Context, releaseAction := task.NewChannelAction(plan.From, task.ActionTypeReduce, plan.Channel.GetChannelName()) actions = append(actions, releaseAction) } - task, err := task.NewChannelTask(s.ctx, + t, err := task.NewChannelTask(s.ctx, Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), utils.ManualBalance, collectionID, @@ -215,13 +217,15 @@ func (s *Server) balanceChannels(ctx context.Context, ) continue } - task.SetReason("manual balance") - err = s.taskScheduler.Add(task) + t.SetReason("manual balance") + // set manual balance channel to high, to avoid manual balance be canceled by other channel task + t.SetPriority(task.TaskPriorityHigh) + err = s.taskScheduler.Add(t) if err != nil { - task.Cancel(err) + t.Cancel(err) return err } - tasks = append(tasks, task) + tasks = append(tasks, t) } if sync {