mirror of https://github.com/milvus-io/milvus.git
Reduce DataScope to historical for segment release task (#25489)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/25415/head
parent
70c4ddc6c5
commit
7d00020c9e
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
|
@ -38,11 +39,11 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
|
|||
for _, p := range plans {
|
||||
actions := make([]task.Action, 0)
|
||||
if p.To != -1 {
|
||||
action := task.NewSegmentAction(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID())
|
||||
action := task.NewSegmentActionWithScope(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical)
|
||||
actions = append(actions, action)
|
||||
}
|
||||
if p.From != -1 {
|
||||
action := task.NewSegmentAction(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID())
|
||||
action := task.NewSegmentActionWithScope(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical)
|
||||
actions = append(actions, action)
|
||||
}
|
||||
t, err := task.NewSegmentTask(
|
||||
|
|
|
@ -77,7 +77,7 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
|
|||
// find already released segments which are not contained in target
|
||||
segments := c.dist.SegmentDistManager.GetAll()
|
||||
released := utils.FilterReleased(segments, collectionIDs)
|
||||
tasks = append(tasks, c.createSegmentReduceTasks(ctx, released, -1, querypb.DataScope_All)...)
|
||||
tasks = append(tasks, c.createSegmentReduceTasks(ctx, released, -1, querypb.DataScope_Historical)...)
|
||||
task.SetPriority(task.TaskPriorityNormal, tasks...)
|
||||
return tasks
|
||||
}
|
||||
|
@ -92,14 +92,14 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
ret = append(ret, tasks...)
|
||||
|
||||
redundancies = c.filterSegmentInUse(replica, redundancies)
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_All)
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Historical)
|
||||
task.SetReason("segment not exists in target", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// compare inner dists to find repeated loaded segments
|
||||
redundancies = c.findRepeatedHistoricalSegments(c.dist, c.meta, replica.GetID())
|
||||
redundancies = c.filterExistedOnLeader(replica, redundancies)
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_All)
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Historical)
|
||||
task.SetReason("redundancies of segment", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
|
|
|
@ -147,8 +147,8 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe
|
|||
req.GetBase().GetMsgID(),
|
||||
req.GetCollectionID(),
|
||||
replica.GetID(),
|
||||
task.NewSegmentAction(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID()),
|
||||
task.NewSegmentAction(srcNode, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID()),
|
||||
task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical),
|
||||
task.NewSegmentActionWithScope(srcNode, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue