diff --git a/internal/core/src/monitor/prometheus_client.cpp b/internal/core/src/monitor/prometheus_client.cpp index 6ca081ed8b..8ca4cb183d 100644 --- a/internal/core/src/monitor/prometheus_client.cpp +++ b/internal/core/src/monitor/prometheus_client.cpp @@ -181,6 +181,8 @@ std::map iterativeFilterLatencyLabels{ {"type", "iterative_filter_latency"}}; std::map scalarProportionLabels{ {"type", "scalar_proportion"}}; +std::map getVectorLatencyLabels{ + {"type", "get_vector_latency"}}; DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency, "[cpp]latency(us) of search on segment") DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar, @@ -200,6 +202,9 @@ DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS( internal_core_search_latency, scalarProportionLabels, ratioBuckets) +DEFINE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency, + internal_core_search_latency, + getVectorLatencyLabels) // mmap metrics std::map mmapAllocatedSpaceAnonLabel = { diff --git a/internal/core/src/monitor/prometheus_client.h b/internal/core/src/monitor/prometheus_client.h index ed8e21cef5..3884dd74e0 100644 --- a/internal/core/src/monitor/prometheus_client.h +++ b/internal/core/src/monitor/prometheus_client.h @@ -138,5 +138,6 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_vector); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_groupby); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_iterative_filter); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar_proportion); +DECLARE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency); } // namespace milvus::monitor diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 37131da722..4199cfc3ba 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -1716,7 +1716,21 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id, } return get_raw_data(field_id, field_meta, seg_offsets, count); } - return get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_start = + std::chrono::high_resolution_clock::now(); + + auto vector = get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_end = + std::chrono::high_resolution_clock::now(); + double get_vector_cost = std::chrono::duration( + get_vector_end - get_vector_start) + .count(); + monitor::internal_core_get_vector_latency.Observe(get_vector_cost / + 1000); + + return vector; } } diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 35454dd7d0..f6e3c9e656 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -1569,7 +1569,21 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, } return get_raw_data(field_id, field_meta, seg_offsets, count); } - return get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_start = + std::chrono::high_resolution_clock::now(); + + auto vector = get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_end = + std::chrono::high_resolution_clock::now(); + double get_vector_cost = std::chrono::duration( + get_vector_end - get_vector_start) + .count(); + monitor::internal_core_get_vector_latency.Observe(get_vector_cost / + 1000); + + return vector; } } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 7b6bbed57c..4e3b66e3ff 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -89,7 +89,7 @@ func (queue *baseTaskQueue) addUnissuedTask(t task) error { defer queue.utLock.Unlock() if queue.utFull() { - return merr.WrapErrServiceRequestLimitExceeded(int32(queue.getMaxTaskNum())) + return merr.WrapErrTooManyRequests(int32(queue.getMaxTaskNum())) } queue.unissuedTasks.PushBack(t) queue.utBufChan <- 1 diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index cab991a0de..89bedd0da5 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -970,7 +970,7 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.Int64("segmentID", task.SegmentID())) if task.Status() == TaskStatusFailed && task.Err() != nil && - !errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceRequestLimitExceeded) { + !errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceTooManyRequests) { scheduler.recordSegmentTaskError(task) } diff --git a/internal/util/searchutil/scheduler/user_task_polling_policy.go b/internal/util/searchutil/scheduler/user_task_polling_policy.go index e09d6c0c58..3a9f86f5e5 100644 --- a/internal/util/searchutil/scheduler/user_task_polling_policy.go +++ b/internal/util/searchutil/scheduler/user_task_polling_policy.go @@ -47,7 +47,7 @@ func (p *userTaskPollingPolicy) Push(task Task) (int, error) { if taskGroupLen > 0 { limit := pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.GetAsInt() if limit > 0 && taskGroupLen >= limit { - return 0, merr.WrapErrServiceRequestLimitExceeded( + return 0, merr.WrapErrTooManyRequests( int32(limit), fmt.Sprintf("limit by %s", pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.Key), ) diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 94cd57677d..5c6152f0db 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -51,7 +51,7 @@ var ( ErrServiceNotReady = newMilvusError("service not ready", 1, true) // This indicates the service is still in init ErrServiceUnavailable = newMilvusError("service unavailable", 2, true) ErrServiceMemoryLimitExceeded = newMilvusError("memory limit exceeded", 3, false) - ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true) + ErrServiceTooManyRequests = newMilvusError("too many concurrent requests, queue is full", 4, true) ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false) ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 26521c40af..fea03f7ffc 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -74,7 +74,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrServiceNotReady("test", 0, "test init..."), ErrServiceNotReady) s.ErrorIs(WrapErrServiceUnavailable("test", "test init"), ErrServiceUnavailable) s.ErrorIs(WrapErrServiceMemoryLimitExceeded(110, 100, "MLE"), ErrServiceMemoryLimitExceeded) - s.ErrorIs(WrapErrServiceRequestLimitExceeded(100, "too many requests"), ErrServiceRequestLimitExceeded) + s.ErrorIs(WrapErrTooManyRequests(100, "too many requests"), ErrServiceTooManyRequests) s.ErrorIs(WrapErrServiceInternal("never throw out"), ErrServiceInternal) s.ErrorIs(WrapErrServiceCrossClusterRouting("ins-0", "ins-1"), ErrServiceCrossClusterRouting) s.ErrorIs(WrapErrServiceDiskLimitExceeded(110, 100, "DLE"), ErrServiceDiskLimitExceeded) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 2c189553eb..628e54162c 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -367,8 +367,8 @@ func WrapErrServiceMemoryLimitExceeded(predict, limit float32, msg ...string) er return err } -func WrapErrServiceRequestLimitExceeded(limit int32, msg ...string) error { - err := wrapFields(ErrServiceRequestLimitExceeded, +func WrapErrTooManyRequests(limit int32, msg ...string) error { + err := wrapFields(ErrServiceTooManyRequests, value("limit", limit), ) if len(msg) > 0 {