enhance: [2.5] Add get vector latency metric and refine request limit error message (#40085)

issue: https://github.com/milvus-io/milvus/issues/40078

pr: https://github.com/milvus-io/milvus/pull/40083

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/40102/head
yihao.dai 2025-02-21 20:19:55 +08:00 committed by GitHub
parent 113d17646b
commit b8a758b6c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 43 additions and 9 deletions

View File

@ -181,6 +181,8 @@ std::map<std::string, std::string> iterativeFilterLatencyLabels{
{"type", "iterative_filter_latency"}}; {"type", "iterative_filter_latency"}};
std::map<std::string, std::string> scalarProportionLabels{ std::map<std::string, std::string> scalarProportionLabels{
{"type", "scalar_proportion"}}; {"type", "scalar_proportion"}};
std::map<std::string, std::string> getVectorLatencyLabels{
{"type", "get_vector_latency"}};
DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency, DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency,
"[cpp]latency(us) of search on segment") "[cpp]latency(us) of search on segment")
DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar, DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar,
@ -200,6 +202,9 @@ DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS(
internal_core_search_latency, internal_core_search_latency,
scalarProportionLabels, scalarProportionLabels,
ratioBuckets) ratioBuckets)
DEFINE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency,
internal_core_search_latency,
getVectorLatencyLabels)
// mmap metrics // mmap metrics
std::map<std::string, std::string> mmapAllocatedSpaceAnonLabel = { std::map<std::string, std::string> mmapAllocatedSpaceAnonLabel = {

View File

@ -138,5 +138,6 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_vector);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_groupby); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_groupby);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_iterative_filter); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_iterative_filter);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar_proportion); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar_proportion);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency);
} // namespace milvus::monitor } // namespace milvus::monitor

View File

@ -1716,7 +1716,21 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
} }
return get_raw_data(field_id, field_meta, seg_offsets, count); return get_raw_data(field_id, field_meta, seg_offsets, count);
} }
return get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_start =
std::chrono::high_resolution_clock::now();
auto vector = get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_end =
std::chrono::high_resolution_clock::now();
double get_vector_cost = std::chrono::duration<double, std::micro>(
get_vector_end - get_vector_start)
.count();
monitor::internal_core_get_vector_latency.Observe(get_vector_cost /
1000);
return vector;
} }
} }

View File

@ -1569,7 +1569,21 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
} }
return get_raw_data(field_id, field_meta, seg_offsets, count); return get_raw_data(field_id, field_meta, seg_offsets, count);
} }
return get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_start =
std::chrono::high_resolution_clock::now();
auto vector = get_vector(field_id, seg_offsets, count);
std::chrono::high_resolution_clock::time_point get_vector_end =
std::chrono::high_resolution_clock::now();
double get_vector_cost = std::chrono::duration<double, std::micro>(
get_vector_end - get_vector_start)
.count();
monitor::internal_core_get_vector_latency.Observe(get_vector_cost /
1000);
return vector;
} }
} }

View File

@ -89,7 +89,7 @@ func (queue *baseTaskQueue) addUnissuedTask(t task) error {
defer queue.utLock.Unlock() defer queue.utLock.Unlock()
if queue.utFull() { if queue.utFull() {
return merr.WrapErrServiceRequestLimitExceeded(int32(queue.getMaxTaskNum())) return merr.WrapErrTooManyRequests(int32(queue.getMaxTaskNum()))
} }
queue.unissuedTasks.PushBack(t) queue.unissuedTasks.PushBack(t)
queue.utBufChan <- 1 queue.utBufChan <- 1

View File

@ -970,7 +970,7 @@ func (scheduler *taskScheduler) remove(task Task) {
log = log.With(zap.Int64("segmentID", task.SegmentID())) log = log.With(zap.Int64("segmentID", task.SegmentID()))
if task.Status() == TaskStatusFailed && if task.Status() == TaskStatusFailed &&
task.Err() != nil && task.Err() != nil &&
!errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceRequestLimitExceeded) { !errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceTooManyRequests) {
scheduler.recordSegmentTaskError(task) scheduler.recordSegmentTaskError(task)
} }

View File

@ -47,7 +47,7 @@ func (p *userTaskPollingPolicy) Push(task Task) (int, error) {
if taskGroupLen > 0 { if taskGroupLen > 0 {
limit := pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.GetAsInt() limit := pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.GetAsInt()
if limit > 0 && taskGroupLen >= limit { if limit > 0 && taskGroupLen >= limit {
return 0, merr.WrapErrServiceRequestLimitExceeded( return 0, merr.WrapErrTooManyRequests(
int32(limit), int32(limit),
fmt.Sprintf("limit by %s", pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.Key), fmt.Sprintf("limit by %s", pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.Key),
) )

View File

@ -51,7 +51,7 @@ var (
ErrServiceNotReady = newMilvusError("service not ready", 1, true) // This indicates the service is still in init ErrServiceNotReady = newMilvusError("service not ready", 1, true) // This indicates the service is still in init
ErrServiceUnavailable = newMilvusError("service unavailable", 2, true) ErrServiceUnavailable = newMilvusError("service unavailable", 2, true)
ErrServiceMemoryLimitExceeded = newMilvusError("memory limit exceeded", 3, false) ErrServiceMemoryLimitExceeded = newMilvusError("memory limit exceeded", 3, false)
ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true) ErrServiceTooManyRequests = newMilvusError("too many concurrent requests, queue is full", 4, true)
ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus
ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false) ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false)
ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false) ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false)

View File

@ -74,7 +74,7 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrServiceNotReady("test", 0, "test init..."), ErrServiceNotReady) s.ErrorIs(WrapErrServiceNotReady("test", 0, "test init..."), ErrServiceNotReady)
s.ErrorIs(WrapErrServiceUnavailable("test", "test init"), ErrServiceUnavailable) s.ErrorIs(WrapErrServiceUnavailable("test", "test init"), ErrServiceUnavailable)
s.ErrorIs(WrapErrServiceMemoryLimitExceeded(110, 100, "MLE"), ErrServiceMemoryLimitExceeded) s.ErrorIs(WrapErrServiceMemoryLimitExceeded(110, 100, "MLE"), ErrServiceMemoryLimitExceeded)
s.ErrorIs(WrapErrServiceRequestLimitExceeded(100, "too many requests"), ErrServiceRequestLimitExceeded) s.ErrorIs(WrapErrTooManyRequests(100, "too many requests"), ErrServiceTooManyRequests)
s.ErrorIs(WrapErrServiceInternal("never throw out"), ErrServiceInternal) s.ErrorIs(WrapErrServiceInternal("never throw out"), ErrServiceInternal)
s.ErrorIs(WrapErrServiceCrossClusterRouting("ins-0", "ins-1"), ErrServiceCrossClusterRouting) s.ErrorIs(WrapErrServiceCrossClusterRouting("ins-0", "ins-1"), ErrServiceCrossClusterRouting)
s.ErrorIs(WrapErrServiceDiskLimitExceeded(110, 100, "DLE"), ErrServiceDiskLimitExceeded) s.ErrorIs(WrapErrServiceDiskLimitExceeded(110, 100, "DLE"), ErrServiceDiskLimitExceeded)

View File

@ -367,8 +367,8 @@ func WrapErrServiceMemoryLimitExceeded(predict, limit float32, msg ...string) er
return err return err
} }
func WrapErrServiceRequestLimitExceeded(limit int32, msg ...string) error { func WrapErrTooManyRequests(limit int32, msg ...string) error {
err := wrapFields(ErrServiceRequestLimitExceeded, err := wrapFields(ErrServiceTooManyRequests,
value("limit", limit), value("limit", limit),
) )
if len(msg) > 0 { if len(msg) > 0 {