mirror of https://github.com/milvus-io/milvus.git
Use pool to block exceeded tasks (#27767)
Signed-off-by: yah01 <yah2er0ne@outlook.com>pull/27744/head
parent
2b13078b14
commit
9467de79c3
|
@ -20,13 +20,16 @@ import (
|
|||
"container/list"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -83,7 +86,7 @@ func (queue *baseTaskQueue) addUnissuedTask(t task) error {
|
|||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.utFull() {
|
||||
return errors.New("task queue is full")
|
||||
return merr.WrapErrServiceRequestLimitExceeded(int32(queue.getMaxTaskNum()))
|
||||
}
|
||||
queue.unissuedTasks.PushBack(t)
|
||||
queue.utBufChan <- 1
|
||||
|
@ -500,6 +503,7 @@ func (sched *taskScheduler) controlLoop() {
|
|||
|
||||
func (sched *taskScheduler) manipulationLoop() {
|
||||
defer sched.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sched.ctx.Done():
|
||||
|
@ -516,6 +520,7 @@ func (sched *taskScheduler) manipulationLoop() {
|
|||
func (sched *taskScheduler) queryLoop() {
|
||||
defer sched.wg.Done()
|
||||
|
||||
pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt(), conc.WithExpiryDuration(time.Minute))
|
||||
for {
|
||||
select {
|
||||
case <-sched.ctx.Done():
|
||||
|
@ -523,7 +528,10 @@ func (sched *taskScheduler) queryLoop() {
|
|||
case <-sched.dqQueue.utChan():
|
||||
if !sched.dqQueue.utEmpty() {
|
||||
t := sched.scheduleDqTask()
|
||||
go sched.processTask(t, sched.dqQueue)
|
||||
pool.Submit(func() (struct{}, error) {
|
||||
sched.processTask(t, sched.dqQueue)
|
||||
return struct{}{}, nil
|
||||
})
|
||||
} else {
|
||||
log.Debug("query queue is empty ...")
|
||||
}
|
||||
|
|
|
@ -973,7 +973,7 @@ So adjust at your risk!`,
|
|||
p.MaxTaskNum = ParamItem{
|
||||
Key: "proxy.maxTaskNum",
|
||||
Version: "2.2.0",
|
||||
DefaultValue: "1024",
|
||||
DefaultValue: "10000",
|
||||
Doc: "max task number of proxy task queue",
|
||||
Export: true,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue