mirror of https://github.com/milvus-io/milvus.git
issue: #34781 pr: #34782 when balance segment hasn't finished yet, query coord may found 2 loaded copy of segment, then it will generate task to deduplicate, which may cancel the balance task. then the old copy has been released, and the new copy hasn't be ready yet but canceled, then search failed by segment lack. this PR set deduplicate segment task's proirity to low, to avoid balance segment task canceled by deduplicate task. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/34922/head
parent
0951fe4e64
commit
ebbccb870c
|
@ -101,8 +101,9 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
|
|||
released := utils.FilterReleased(segments, collectionIDs)
|
||||
reduceTasks := c.createSegmentReduceTasks(ctx, released, meta.NilReplica, querypb.DataScope_Historical)
|
||||
task.SetReason("collection released", reduceTasks...)
|
||||
task.SetPriority(task.TaskPriorityNormal, reduceTasks...)
|
||||
results = append(results, reduceTasks...)
|
||||
task.SetPriority(task.TaskPriorityNormal, results...)
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
|
@ -114,11 +115,13 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
// loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan)
|
||||
tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica)
|
||||
task.SetReason("lacks of segment", tasks...)
|
||||
task.SetPriority(task.TaskPriorityNormal, tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
redundancies = c.filterSegmentInUse(replica, redundancies)
|
||||
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
|
||||
task.SetReason("segment not exists in target", tasks...)
|
||||
task.SetPriority(task.TaskPriorityNormal, tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// compare inner dists to find repeated loaded segments
|
||||
|
@ -126,12 +129,15 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
redundancies = c.filterExistedOnLeader(replica, redundancies)
|
||||
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
|
||||
task.SetReason("redundancies of segment", tasks...)
|
||||
// set deduplicate task priority to low, to avoid deduplicate task cancel balance task
|
||||
task.SetPriority(task.TaskPriorityLow, tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// compare with target to find the lack and redundancy of segments
|
||||
_, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID())
|
||||
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Streaming)
|
||||
task.SetReason("streaming segment not exists in target", tasks...)
|
||||
task.SetPriority(task.TaskPriorityNormal, tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
return ret
|
||||
|
|
|
@ -448,7 +448,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
|
|||
suite.Equal(task.ActionTypeReduce, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.EqualValues(1, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
|
||||
|
||||
// test less version exist on leader
|
||||
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{}))
|
||||
|
|
|
@ -122,7 +122,7 @@ func (s *Server) balanceSegments(ctx context.Context,
|
|||
actions = append(actions, releaseAction)
|
||||
}
|
||||
|
||||
task, err := task.NewSegmentTask(s.ctx,
|
||||
t, err := task.NewSegmentTask(s.ctx,
|
||||
Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
|
||||
utils.ManualBalance,
|
||||
collectionID,
|
||||
|
@ -140,13 +140,15 @@ func (s *Server) balanceSegments(ctx context.Context,
|
|||
)
|
||||
continue
|
||||
}
|
||||
task.SetReason("manual balance")
|
||||
err = s.taskScheduler.Add(task)
|
||||
t.SetReason("manual balance")
|
||||
// set manual balance to normal, to avoid manual balance be canceled by other segment task
|
||||
t.SetPriority(task.TaskPriorityNormal)
|
||||
err = s.taskScheduler.Add(t)
|
||||
if err != nil {
|
||||
task.Cancel(err)
|
||||
t.Cancel(err)
|
||||
return err
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
tasks = append(tasks, t)
|
||||
}
|
||||
|
||||
if sync {
|
||||
|
@ -198,7 +200,7 @@ func (s *Server) balanceChannels(ctx context.Context,
|
|||
releaseAction := task.NewChannelAction(plan.From, task.ActionTypeReduce, plan.Channel.GetChannelName())
|
||||
actions = append(actions, releaseAction)
|
||||
}
|
||||
task, err := task.NewChannelTask(s.ctx,
|
||||
t, err := task.NewChannelTask(s.ctx,
|
||||
Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond),
|
||||
utils.ManualBalance,
|
||||
collectionID,
|
||||
|
@ -215,13 +217,15 @@ func (s *Server) balanceChannels(ctx context.Context,
|
|||
)
|
||||
continue
|
||||
}
|
||||
task.SetReason("manual balance")
|
||||
err = s.taskScheduler.Add(task)
|
||||
t.SetReason("manual balance")
|
||||
// set manual balance channel to high, to avoid manual balance be canceled by other channel task
|
||||
t.SetPriority(task.TaskPriorityHigh)
|
||||
err = s.taskScheduler.Add(t)
|
||||
if err != nil {
|
||||
task.Cancel(err)
|
||||
t.Cancel(err)
|
||||
return err
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
tasks = append(tasks, t)
|
||||
}
|
||||
|
||||
if sync {
|
||||
|
|
Loading…
Reference in New Issue