fix: [hotfix] Add sub task pool for multi-stage tasks (#40080)

Cherry-pick from master
pr: #40079 
Related to #40078

Add a subTaskPool to execute sub task in case of logic deadlock
described in issue.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
Co-authored-by: Cai Zhang <cai.zhang@zilliz.com>
Co-authored-by: bigsheeper <yihao.dai@zilliz.com>
hotfix-2.5.4
congqixia 2025-02-21 16:06:12 +08:00 committed by GitHub
parent 3f613b7409
commit 01f8faacae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 122 additions and 11 deletions

View File

@ -181,6 +181,8 @@ std::map<std::string, std::string> iterativeFilterLatencyLabels{
{"type", "iterative_filter_latency"}};
std::map<std::string, std::string> scalarProportionLabels{
{"type", "scalar_proportion"}};
std::map<std::string, std::string> getVectorLatencyLabels{
{"type", "get_vector_latency"}};
DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency,
"[cpp]latency(us) of search on segment")
DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar,
@ -200,6 +202,9 @@ DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS(
internal_core_search_latency,
scalarProportionLabels,
ratioBuckets)
DEFINE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency,
internal_core_search_latency,
getVectorLatencyLabels)
// mmap metrics
std::map<std::string, std::string> mmapAllocatedSpaceAnonLabel = {

View File

@ -138,5 +138,6 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_vector);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_groupby);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_iterative_filter);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar_proportion);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency);
} // namespace milvus::monitor

View File

@ -1759,7 +1759,21 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
}
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_start =
std::chrono::high_resolution_clock::now();
auto vector = get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_end =
std::chrono::high_resolution_clock::now();
double get_vector_cost = std::chrono::duration<double, std::micro>(
get_vector_end - get_vector_start)
.count();
monitor::internal_core_get_vector_latency.Observe(get_vector_cost /
1000);
return vector;
}
Assert(get_bit(field_data_ready_bitset_, field_id));

View File

@ -1582,7 +1582,21 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
}
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_start =
std::chrono::high_resolution_clock::now();
auto vector = get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_end =
std::chrono::high_resolution_clock::now();
double get_vector_cost = std::chrono::duration<double, std::micro>(
get_vector_end - get_vector_start)
.count();
monitor::internal_core_get_vector_latency.Observe(get_vector_cost /
1000);
return vector;
}
Assert(get_bit(field_data_ready_bitset_, field_id));

View File

@ -140,6 +140,7 @@ type task interface {
CanSkipAllocTimestamp() bool
SetOnEnqueueTime()
GetDurationInQueue() time.Duration
IsSubTask() bool
}
type baseTask struct {
@ -158,6 +159,10 @@ func (bt *baseTask) GetDurationInQueue() time.Duration {
return time.Since(bt.onEnqueueTime)
}
func (bt *baseTask) IsSubTask() bool {
return false
}
type dmlTask interface {
task
setChannels() error

View File

@ -577,6 +577,10 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
return nil
}
func (t *queryTask) IsSubTask() bool {
return t.reQuery
}
func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error {
needOverrideMvcc := false
mvccTs := t.MvccTimestamp

View File

@ -89,7 +89,7 @@ func (queue *baseTaskQueue) addUnissuedTask(t task) error {
defer queue.utLock.Unlock()
if queue.utFull() {
return merr.WrapErrServiceRequestLimitExceeded(int32(queue.getMaxTaskNum()))
return merr.WrapErrTooManyRequests(int32(queue.getMaxTaskNum()))
}
queue.unissuedTasks.PushBack(t)
queue.utBufChan <- 1
@ -228,6 +228,18 @@ type ddTaskQueue struct {
lock sync.Mutex
}
func (queue *ddTaskQueue) updateMetrics() {
queue.utLock.RLock()
unissuedTasksNum := queue.unissuedTasks.Len()
queue.utLock.RUnlock()
queue.atLock.RLock()
activateTaskNum := len(queue.activeTasks)
queue.atLock.RUnlock()
metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "ddl", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum))
metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "ddl", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum))
}
type pChanStatInfo struct {
pChanStatistics
tsSet map[Timestamp]struct{}
@ -241,6 +253,18 @@ type dmTaskQueue struct {
pChanStatisticsInfos map[pChan]*pChanStatInfo
}
func (queue *dmTaskQueue) updateMetrics() {
queue.utLock.RLock()
unissuedTasksNum := queue.unissuedTasks.Len()
queue.utLock.RUnlock()
queue.atLock.RLock()
activateTaskNum := len(queue.activeTasks)
queue.atLock.RUnlock()
metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dml", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum))
metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dml", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum))
}
func (queue *dmTaskQueue) Enqueue(t task) error {
// This statsLock has two functions:
// 1) Protect member pChanStatisticsInfos
@ -361,6 +385,18 @@ type dqTaskQueue struct {
*baseTaskQueue
}
func (queue *dqTaskQueue) updateMetrics() {
queue.utLock.RLock()
unissuedTasksNum := queue.unissuedTasks.Len()
queue.utLock.RUnlock()
queue.atLock.RLock()
activateTaskNum := len(queue.activeTasks)
queue.atLock.RUnlock()
metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dql", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum))
metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dql", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum))
}
func (queue *ddTaskQueue) Enqueue(t task) error {
queue.lock.Lock()
defer queue.lock.Unlock()
@ -507,6 +543,7 @@ func (sched *taskScheduler) definitionLoop() {
return struct{}{}, nil
})
}
sched.ddQueue.updateMetrics()
}
}
}
@ -528,6 +565,7 @@ func (sched *taskScheduler) controlLoop() {
return struct{}{}, nil
})
}
sched.dcQueue.updateMetrics()
}
}
}
@ -547,6 +585,7 @@ func (sched *taskScheduler) manipulationLoop() {
return struct{}{}, nil
})
}
sched.dmQueue.updateMetrics()
}
}
}
@ -554,7 +593,10 @@ func (sched *taskScheduler) manipulationLoop() {
func (sched *taskScheduler) queryLoop() {
defer sched.wg.Done()
pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt(), conc.WithExpiryDuration(time.Minute))
poolSize := paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt()
pool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute))
subTaskPool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute))
for {
select {
case <-sched.ctx.Done():
@ -562,13 +604,19 @@ func (sched *taskScheduler) queryLoop() {
case <-sched.dqQueue.utChan():
if !sched.dqQueue.utEmpty() {
t := sched.scheduleDqTask()
pool.Submit(func() (struct{}, error) {
p := pool
// if task is sub task spawned by another, use sub task pool in case of deadlock
if t.IsSubTask() {
p = subTaskPool
}
p.Submit(func() (struct{}, error) {
sched.processTask(t, sched.dqQueue)
return struct{}{}, nil
})
} else {
log.Ctx(context.TODO()).Debug("query queue is empty ...")
}
sched.dqQueue.updateMetrics()
}
}
}

View File

@ -964,7 +964,7 @@ func (scheduler *taskScheduler) remove(task Task) {
log = log.With(zap.Int64("segmentID", task.SegmentID()))
if task.Status() == TaskStatusFailed &&
task.Err() != nil &&
!errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceRequestLimitExceeded) {
!errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceTooManyRequests) {
scheduler.recordSegmentTaskError(task)
}

View File

@ -47,7 +47,7 @@ func (p *userTaskPollingPolicy) Push(task Task) (int, error) {
if taskGroupLen > 0 {
limit := pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.GetAsInt()
if limit > 0 && taskGroupLen >= limit {
return 0, merr.WrapErrServiceRequestLimitExceeded(
return 0, merr.WrapErrTooManyRequests(
int32(limit),
fmt.Sprintf("limit by %s", pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.Key),
)

View File

@ -124,6 +124,7 @@ const (
pathLabelName = "path"
cgoNameLabelName = `cgo_name`
cgoTypeLabelName = `cgo_type`
queueTypeLabelName = `queue_type`
// entities label
LoadedLabel = "loaded"

View File

@ -427,6 +427,24 @@ var (
Help: "the number of non-zeros in each sparse search task",
Buckets: buckets,
}, []string{nodeIDLabelName, collectionName})
ProxyParseExpressionLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "parse_expr_latency",
Help: "the latency of parse expression",
Buckets: buckets,
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
// ProxyQueueTaskNum records task number of queue in Proxy.
ProxyQueueTaskNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "queue_task_num",
Help: "",
}, []string{nodeIDLabelName, queueTypeLabelName, taskStateLabel})
)
// RegisterProxy registers Proxy metrics
@ -490,6 +508,7 @@ func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxyRecallSearchCount)
registry.MustRegister(ProxySearchSparseNumNonZeros)
registry.MustRegister(ProxyQueueTaskNum)
RegisterStreamingServiceClient(registry)
}

View File

@ -51,7 +51,7 @@ var (
ErrServiceNotReady = newMilvusError("service not ready", 1, true) // This indicates the service is still in init
ErrServiceUnavailable = newMilvusError("service unavailable", 2, true)
ErrServiceMemoryLimitExceeded = newMilvusError("memory limit exceeded", 3, false)
ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true)
ErrServiceTooManyRequests = newMilvusError("too many concurrent requests, queue is full", 4, true)
ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus
ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false)
ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false)

View File

@ -74,7 +74,7 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrServiceNotReady("test", 0, "test init..."), ErrServiceNotReady)
s.ErrorIs(WrapErrServiceUnavailable("test", "test init"), ErrServiceUnavailable)
s.ErrorIs(WrapErrServiceMemoryLimitExceeded(110, 100, "MLE"), ErrServiceMemoryLimitExceeded)
s.ErrorIs(WrapErrServiceRequestLimitExceeded(100, "too many requests"), ErrServiceRequestLimitExceeded)
s.ErrorIs(WrapErrTooManyRequests(100, "too many requests"), ErrServiceTooManyRequests)
s.ErrorIs(WrapErrServiceInternal("never throw out"), ErrServiceInternal)
s.ErrorIs(WrapErrServiceCrossClusterRouting("ins-0", "ins-1"), ErrServiceCrossClusterRouting)
s.ErrorIs(WrapErrServiceDiskLimitExceeded(110, 100, "DLE"), ErrServiceDiskLimitExceeded)

View File

@ -367,8 +367,8 @@ func WrapErrServiceMemoryLimitExceeded(predict, limit float32, msg ...string) er
return err
}
func WrapErrServiceRequestLimitExceeded(limit int32, msg ...string) error {
err := wrapFields(ErrServiceRequestLimitExceeded,
func WrapErrTooManyRequests(limit int32, msg ...string) error {
err := wrapFields(ErrServiceTooManyRequests,
value("limit", limit),
)
if len(msg) > 0 {