mirror of https://github.com/milvus-io/milvus.git
Optimize scheduler, increase merge tasks probability (#20922)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/20935/head
parent
f7d603f947
commit
ddd29ea6ab
|
@ -20,10 +20,9 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -31,7 +30,10 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
. "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -479,10 +481,11 @@ func (scheduler *taskScheduler) schedule(node int64) {
|
|||
)
|
||||
|
||||
// Process tasks
|
||||
toProcess := make([]Task, 0)
|
||||
toRemove := make([]Task, 0)
|
||||
scheduler.processQueue.Range(func(task Task) bool {
|
||||
if scheduler.isRelated(task, node) {
|
||||
scheduler.process(task)
|
||||
if scheduler.isRelated(task, node) && scheduler.preProcess(task) {
|
||||
toProcess = append(toProcess, task)
|
||||
}
|
||||
if task.Status() != TaskStatusStarted {
|
||||
toRemove = append(toRemove, task)
|
||||
|
@ -491,12 +494,25 @@ func (scheduler *taskScheduler) schedule(node int64) {
|
|||
return true
|
||||
})
|
||||
|
||||
// The scheduler doesn't limit the number of tasks,
|
||||
// to commit tasks to executors as soon as possible, to reach higher merge possibility
|
||||
failCount := atomic.NewInt32(0)
|
||||
funcutil.ProcessFuncParallel(len(toProcess), runtime.GOMAXPROCS(0), func(idx int) error {
|
||||
if !scheduler.process(toProcess[idx]) {
|
||||
failCount.Inc()
|
||||
}
|
||||
return nil
|
||||
}, "process")
|
||||
|
||||
for _, task := range toRemove {
|
||||
scheduler.remove(task)
|
||||
}
|
||||
|
||||
log.Info("processed tasks",
|
||||
zap.Int("toRemoveNum", len(toRemove)))
|
||||
zap.Int("toProcessNum", len(toProcess)),
|
||||
zap.Int32("failCount", failCount.Load()),
|
||||
zap.Int("toRemoveNum", len(toRemove)),
|
||||
)
|
||||
|
||||
log.Debug("process tasks related to node done",
|
||||
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
|
||||
|
@ -538,9 +554,11 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// process processes the given task,
|
||||
// return true if the task is started and succeeds to commit the current action
|
||||
func (scheduler *taskScheduler) process(task Task) bool {
|
||||
// preProcess checks the finished actions of task,
|
||||
// and converts the task's status,
|
||||
// return true if the task should be executed,
|
||||
// false otherwise
|
||||
func (scheduler *taskScheduler) preProcess(task Task) bool {
|
||||
log := log.With(
|
||||
zap.Int64("taskID", task.ID()),
|
||||
zap.Int32("type", GetTaskType(task)),
|
||||
|
@ -566,10 +584,10 @@ func (scheduler *taskScheduler) process(task Task) bool {
|
|||
}
|
||||
|
||||
if task.IsFinished(scheduler.distMgr) {
|
||||
if executor.Exist(task.ID()) {
|
||||
return false
|
||||
if !executor.Exist(task.ID()) {
|
||||
task.SetStatus(TaskStatusSucceeded)
|
||||
}
|
||||
task.SetStatus(TaskStatusSucceeded)
|
||||
return false
|
||||
} else if scheduler.checkCanceled(task) {
|
||||
task.SetStatus(TaskStatusCanceled)
|
||||
if task.Err() == nil {
|
||||
|
@ -580,12 +598,31 @@ func (scheduler *taskScheduler) process(task Task) bool {
|
|||
task.SetErr(ErrTaskStale)
|
||||
}
|
||||
|
||||
return task.Status() == TaskStatusStarted
|
||||
}
|
||||
|
||||
// process processes the given task,
|
||||
// return true if the task is started and succeeds to commit the current action
|
||||
func (scheduler *taskScheduler) process(task Task) bool {
|
||||
log := log.With(
|
||||
zap.Int64("taskID", task.ID()),
|
||||
zap.Int32("type", GetTaskType(task)),
|
||||
zap.Int64("source", task.SourceID()),
|
||||
)
|
||||
|
||||
actions, step := task.Actions(), task.Step()
|
||||
executor, ok := scheduler.executors[actions[step].Node()]
|
||||
if !ok {
|
||||
log.Warn("no executor for QueryNode",
|
||||
zap.Int("step", step),
|
||||
zap.Int64("nodeID", actions[step].Node()))
|
||||
return false
|
||||
}
|
||||
|
||||
log = log.With(zap.Int("step", step))
|
||||
switch task.Status() {
|
||||
case TaskStatusStarted:
|
||||
if executor.Execute(task, step) {
|
||||
return true
|
||||
}
|
||||
return executor.Execute(task, step)
|
||||
|
||||
case TaskStatusSucceeded:
|
||||
log.Info("task succeeded")
|
||||
|
|
Loading…
Reference in New Issue