mirror of https://github.com/milvus-io/milvus.git
parent
d126f06946
commit
d4fccdd135
|
@ -79,12 +79,16 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
|
||||
lacks, redundancies := c.getDmChannelDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID())
|
||||
tasks := c.createChannelLoadTask(ctx, lacks, replica)
|
||||
task.SetReason("lacks of channel", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
tasks = c.createChannelReduceTasks(ctx, redundancies, replica.GetID())
|
||||
task.SetReason("collection released", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
repeated := c.findRepeatedChannels(c.dist, c.meta, replica.GetID())
|
||||
tasks = c.createChannelReduceTasks(ctx, repeated, replica.GetID())
|
||||
task.SetReason("redundancies of channel")
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// All channel related tasks should be with high priority
|
||||
|
@ -179,9 +183,7 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*
|
|||
for i := range plans {
|
||||
plans[i].ReplicaID = replica.GetID()
|
||||
}
|
||||
// log.Debug("try to subscribe channels",
|
||||
// zap.Any("channels", channels),
|
||||
// zap.Any("plans", plans))
|
||||
|
||||
return balance.CreateChannelTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), plans)
|
||||
}
|
||||
|
||||
|
|
|
@ -83,20 +83,24 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
// compare with targets to find the lack and redundancy of segments
|
||||
lacks, redundancies := c.getHistoricalSegmentDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID())
|
||||
tasks := c.createSegmentLoadTasks(ctx, lacks, replica)
|
||||
task.SetReason("lacks of segment", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_All)
|
||||
task.SetReason("segment not exists in target", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// compare inner dists to find repeated loaded segments
|
||||
redundancies = c.findRepeatedHistoricalSegments(c.dist, c.meta, replica.GetID())
|
||||
redundancies = c.filterExistedOnLeader(replica, redundancies)
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_All)
|
||||
task.SetReason("redundancies of segment", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// compare with target to find the lack and redundancy of segments
|
||||
_, redundancies = c.getStreamingSegmentDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID())
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Streaming)
|
||||
task.SetReason("streaming segment not exists in target", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
return ret
|
||||
|
|
|
@ -77,6 +77,7 @@ type Task interface {
|
|||
Step() int
|
||||
StepUp() int
|
||||
IsFinished(dist *meta.DistributionManager) bool
|
||||
SetReason(reason string)
|
||||
String() string
|
||||
}
|
||||
|
||||
|
@ -98,6 +99,7 @@ type baseTask struct {
|
|||
err error
|
||||
actions []Action
|
||||
step int
|
||||
reason string
|
||||
}
|
||||
|
||||
func newBaseTask(ctx context.Context, sourceID, collectionID, replicaID UniqueID, shard string) *baseTask {
|
||||
|
@ -202,6 +204,10 @@ func (task *baseTask) IsFinished(distMgr *meta.DistributionManager) bool {
|
|||
return task.Step() >= len(task.Actions())
|
||||
}
|
||||
|
||||
func (task *baseTask) SetReason(reason string) {
|
||||
task.reason = reason
|
||||
}
|
||||
|
||||
func (task *baseTask) String() string {
|
||||
var actionsStr string
|
||||
for i, action := range task.actions {
|
||||
|
@ -215,9 +221,10 @@ func (task *baseTask) String() string {
|
|||
}
|
||||
}
|
||||
return fmt.Sprintf(
|
||||
"[id=%d] [type=%v] [collectionID=%d] [replicaID=%d] [priority=%d] [actionsCount=%d] [actions=%s]",
|
||||
"[id=%d] [type=%v] [reason=%s] [collectionID=%d] [replicaID=%d] [priority=%d] [actionsCount=%d] [actions=%s]",
|
||||
task.id,
|
||||
GetTaskType(task),
|
||||
task.reason,
|
||||
task.collectionID,
|
||||
task.replicaID,
|
||||
task.priority,
|
||||
|
|
|
@ -66,6 +66,12 @@ func SetPriorityWithFunc(f func(t Task) Priority, tasks ...Task) {
|
|||
}
|
||||
}
|
||||
|
||||
func SetReason(reason string, tasks ...Task) {
|
||||
for i := range tasks {
|
||||
tasks[i].SetReason(reason)
|
||||
}
|
||||
}
|
||||
|
||||
// GetTaskType returns the task's type,
|
||||
// for now, only 3 types;
|
||||
// - only 1 grow action -> Grow
|
||||
|
|
Loading…
Reference in New Issue