Scheduler should returns err if the queue is full (#20465)

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/20537/head
yah01 2022-11-11 17:55:05 +08:00 committed by GitHub
parent f91c504db6
commit a82c235599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 16 deletions

View File

@ -27,7 +27,6 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"go.uber.org/zap"
)
var (
@ -111,18 +110,11 @@ func (controller *CheckerController) check(ctx context.Context) {
tasks = append(tasks, checker.Check(ctx)...)
}
added := 0
for _, task := range tasks {
err := controller.scheduler.Add(task)
if err != nil {
task.Cancel()
continue
}
added++
if added >= checkRoundTaskNumLimit {
log.Info("checkers have added too many tasks, truncate the subsequent tasks",
zap.Int("taskNum", len(tasks)),
zap.Int("taskNumLimit", checkRoundTaskNumLimit))
}
}
}

View File

@ -218,6 +218,11 @@ func (scheduler *taskScheduler) Add(task Task) error {
return err
}
if !scheduler.waitQueue.Add(task) {
log.Warn("failed to add task", zap.String("task", task.String()))
return ErrTaskQueueFull
}
task.SetID(scheduler.idAllocator())
scheduler.tasks.Insert(task.ID())
switch task := task.(type) {
@ -229,10 +234,7 @@ func (scheduler *taskScheduler) Add(task Task) error {
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
scheduler.channelTasks[index] = task
}
if !scheduler.waitQueue.Add(task) {
log.Warn("failed to add task", zap.String("task", task.String()))
return nil
}
metrics.QueryCoordTaskNum.WithLabelValues().Set(float64(scheduler.tasks.Len()))
log.Info("task added", zap.String("task", task.String()))
return nil
@ -241,10 +243,6 @@ func (scheduler *taskScheduler) Add(task Task) error {
// check checks whether the task is valid to add,
// must hold lock
func (scheduler *taskScheduler) preAdd(task Task) error {
if scheduler.waitQueue.Len() >= scheduler.waitQueue.Cap() {
return ErrTaskQueueFull
}
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)