mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/20379/head
parent
3f67fad59a
commit
261bfeebfd
|
@ -43,8 +43,11 @@ func (b *BalanceChecker) Description() string {
|
|||
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
|
||||
ret := make([]task.Task, 0)
|
||||
segmentPlans, channelPlans := b.Balance.Balance()
|
||||
|
||||
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout, segmentPlans)
|
||||
task.SetPriority(task.TaskPriorityLow, tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
tasks = balance.CreateChannelTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.ChannelTaskTimeout, channelPlans)
|
||||
ret = append(ret, tasks...)
|
||||
return ret
|
||||
|
|
|
@ -85,6 +85,9 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
repeated := findRepeatedChannels(dists)
|
||||
tasks = c.createChannelReduceTasks(ctx, repeated, replica.GetID())
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// All channel related tasks should be with high priority
|
||||
task.SetPriority(task.TaskPriorityHigh, tasks...)
|
||||
return ret
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
|
@ -62,6 +63,16 @@ type Type = int32
|
|||
type replicaSegmentIndex struct {
|
||||
ReplicaID int64
|
||||
SegmentID int64
|
||||
IsGrowing bool
|
||||
}
|
||||
|
||||
func NewReplicaSegmentIndex(task *SegmentTask) replicaSegmentIndex {
|
||||
isGrowing := task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Streaming
|
||||
return replicaSegmentIndex{
|
||||
ReplicaID: task.ReplicaID(),
|
||||
SegmentID: task.SegmentID(),
|
||||
IsGrowing: isGrowing,
|
||||
}
|
||||
}
|
||||
|
||||
type replicaChannelIndex struct {
|
||||
|
@ -208,7 +219,7 @@ func (scheduler *taskScheduler) Add(task Task) error {
|
|||
scheduler.tasks.Insert(task.ID())
|
||||
switch task := task.(type) {
|
||||
case *SegmentTask:
|
||||
index := replicaSegmentIndex{task.ReplicaID(), task.SegmentID()}
|
||||
index := NewReplicaSegmentIndex(task)
|
||||
scheduler.segmentTasks[index] = task
|
||||
|
||||
case *ChannelTask:
|
||||
|
@ -232,7 +243,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
|
|||
|
||||
switch task := task.(type) {
|
||||
case *SegmentTask:
|
||||
index := replicaSegmentIndex{task.ReplicaID(), task.segmentID}
|
||||
index := NewReplicaSegmentIndex(task)
|
||||
if old, ok := scheduler.segmentTasks[index]; ok {
|
||||
if task.Priority() > old.Priority() {
|
||||
log.Info("replace old task, the new one with higher priority",
|
||||
|
@ -553,7 +564,7 @@ func (scheduler *taskScheduler) remove(task Task) {
|
|||
|
||||
switch task := task.(type) {
|
||||
case *SegmentTask:
|
||||
index := replicaSegmentIndex{task.ReplicaID(), task.SegmentID()}
|
||||
index := NewReplicaSegmentIndex(task)
|
||||
delete(scheduler.segmentTasks, index)
|
||||
log = log.With(zap.Int64("segmentID", task.SegmentID()))
|
||||
|
||||
|
|
|
@ -50,6 +50,12 @@ func Wait(ctx context.Context, timeout time.Duration, tasks ...Task) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func SetPriority(priority Priority, tasks ...Task) {
|
||||
for i := range tasks {
|
||||
tasks[i].SetPriority(priority)
|
||||
}
|
||||
}
|
||||
|
||||
// GetTaskType returns the task's type,
|
||||
// for now, only 3 types;
|
||||
// - only 1 grow action -> Grow
|
||||
|
|
Loading…
Reference in New Issue