mirror of https://github.com/milvus-io/milvus.git
enhance: add scalar filtering and vector search latency metrics (#34785)
add scalar filtering and vector search latency metrics to distinguish the cost of scalar filtering. To add metrics in query chain, add a monitor module and move the metric files from original storage module. issue: #34780 Signed-off-by: xianliang.li <xianliang.li@zilliz.com>pull/34828/head
parent
c61592dcea
commit
8e64bf929c
|
@ -305,6 +305,12 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/common/
|
|||
FILES_MATCHING PATTERN "*_c.h"
|
||||
)
|
||||
|
||||
# Install monitor
|
||||
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/monitor/
|
||||
DESTINATION include/monitor
|
||||
FILES_MATCHING PATTERN "*_c.h"
|
||||
)
|
||||
|
||||
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/futures/
|
||||
DESTINATION include/futures
|
||||
FILES_MATCHING PATTERN "*.h"
|
||||
|
|
|
@ -27,6 +27,7 @@ add_subdirectory( pb )
|
|||
add_subdirectory( log )
|
||||
add_subdirectory( config )
|
||||
add_subdirectory( common )
|
||||
add_subdirectory( monitor )
|
||||
add_subdirectory( storage )
|
||||
add_subdirectory( index )
|
||||
add_subdirectory( query )
|
||||
|
|
|
@ -50,7 +50,7 @@
|
|||
#include "storage/ThreadPools.h"
|
||||
#include "storage/space.h"
|
||||
#include "storage/Util.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "monitor/prometheus_client.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
|
@ -798,10 +798,10 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
|||
write_disk_duration_sum +=
|
||||
(std::chrono::system_clock::now() - start_write_file);
|
||||
}
|
||||
milvus::storage::internal_storage_download_duration.Observe(
|
||||
milvus::monitor::internal_storage_download_duration.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(load_duration_sum)
|
||||
.count());
|
||||
milvus::storage::internal_storage_write_disk_duration.Observe(
|
||||
milvus::monitor::internal_storage_write_disk_duration.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
write_disk_duration_sum)
|
||||
.count());
|
||||
|
@ -820,7 +820,7 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
|||
"failed to Deserialize index: {}",
|
||||
KnowhereStatusString(stat));
|
||||
}
|
||||
milvus::storage::internal_storage_deserialize_duration.Observe(
|
||||
milvus::monitor::internal_storage_deserialize_duration.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
deserialize_duration)
|
||||
.count());
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
#include "common/FieldDataInterface.h"
|
||||
#include "common/Array.h"
|
||||
#include "knowhere/dataset.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "monitor/prometheus_client.h"
|
||||
#include "storage/MmapChunkManager.h"
|
||||
|
||||
namespace milvus {
|
||||
|
@ -371,14 +371,14 @@ class ColumnBase {
|
|||
void
|
||||
UpdateMetricWhenMmap(bool is_map_anonymous, size_t mapped_size) {
|
||||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
milvus::storage::internal_mmap_allocated_space_bytes_anon.Observe(
|
||||
milvus::monitor::internal_mmap_allocated_space_bytes_anon.Observe(
|
||||
mapped_size);
|
||||
milvus::storage::internal_mmap_in_used_space_bytes_anon.Increment(
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_anon.Increment(
|
||||
mapped_size);
|
||||
} else {
|
||||
milvus::storage::internal_mmap_allocated_space_bytes_file.Observe(
|
||||
milvus::monitor::internal_mmap_allocated_space_bytes_file.Observe(
|
||||
mapped_size);
|
||||
milvus::storage::internal_mmap_in_used_space_bytes_file.Increment(
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Increment(
|
||||
mapped_size);
|
||||
}
|
||||
}
|
||||
|
@ -386,10 +386,10 @@ class ColumnBase {
|
|||
void
|
||||
UpdateMetricWhenMunmap(size_t mapped_size) {
|
||||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
milvus::storage::internal_mmap_in_used_space_bytes_anon.Decrement(
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_anon.Decrement(
|
||||
mapped_size);
|
||||
} else {
|
||||
milvus::storage::internal_mmap_in_used_space_bytes_file.Decrement(
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Decrement(
|
||||
mapped_size);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
milvus_add_pkg_config("milvus_monitor")
|
||||
|
||||
set(MONITOR_SRC
|
||||
monitor_c.cpp
|
||||
prometheus_client.cpp
|
||||
)
|
||||
|
||||
add_library(milvus_monitor SHARED ${MONITOR_SRC})
|
||||
|
||||
install(TARGETS milvus_monitor DESTINATION "${CMAKE_INSTALL_LIBDIR}")
|
|
@ -0,0 +1,8 @@
|
|||
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
|
||||
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
|
||||
|
||||
Name: Milvus Monitor
|
||||
Description: Monitor modules for Milvus
|
||||
Version: @MILVUS_VERSION@
|
||||
|
||||
Libs: -L${libdir} -lmilvus_monitor
|
|
@ -0,0 +1,25 @@
|
|||
// Copyright (C) 2019-2023 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "monitor_c.h"
|
||||
#include "prometheus_client.h"
|
||||
|
||||
char*
|
||||
GetCoreMetrics() {
|
||||
auto str = milvus::monitor::prometheusClient->GetMetrics();
|
||||
auto len = str.length();
|
||||
char* res = (char*)malloc(len + 1);
|
||||
memcpy(res, str.data(), len);
|
||||
res[len] = '\0';
|
||||
return res;
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
char*
|
||||
GetCoreMetrics();
|
||||
|
||||
#ifdef __cplusplus
|
||||
};
|
||||
#endif
|
|
@ -9,9 +9,9 @@
|
|||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "prometheus_client.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
namespace milvus::monitor {
|
||||
|
||||
const prometheus::Histogram::BucketBoundaries buckets = {1,
|
||||
2,
|
||||
|
@ -47,8 +47,12 @@ const prometheus::Histogram::BucketBoundaries bytesBuckets = {
|
|||
536870912, // 512M
|
||||
1073741824}; // 1G
|
||||
|
||||
const std::unique_ptr<PrometheusClient> prometheusClient =
|
||||
std::make_unique<PrometheusClient>();
|
||||
const prometheus::Histogram::BucketBoundaries ratioBuckets =
|
||||
{0.0, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5,
|
||||
0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 1.0};
|
||||
|
||||
const std::unique_ptr<PrometheusClient>
|
||||
prometheusClient = std::make_unique<PrometheusClient>();
|
||||
|
||||
/******************GetMetrics*************************************************************
|
||||
* !!! NOT use SUMMARY metrics here, because when parse SUMMARY metrics in Milvus,
|
||||
|
@ -166,6 +170,27 @@ DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_deserialize_duration,
|
|||
internal_storage_load_duration,
|
||||
deserializeDurationLabels)
|
||||
|
||||
// search latency metrics
|
||||
std::map<std::string, std::string> scalarLatencyLabels{
|
||||
{"type", "scalar_latency"}};
|
||||
std::map<std::string, std::string> vectorLatencyLabels{
|
||||
{"type", "vector_latency"}};
|
||||
std::map<std::string, std::string> scalarProportionLabels{
|
||||
{"type", "scalar_proportion"}};
|
||||
DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency,
|
||||
"[cpp]latency(us) of search on segment")
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar,
|
||||
internal_core_search_latency,
|
||||
scalarLatencyLabels)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_vector,
|
||||
internal_core_search_latency,
|
||||
vectorLatencyLabels)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS(
|
||||
internal_core_search_latency_scalar_proportion,
|
||||
internal_core_search_latency,
|
||||
scalarProportionLabels,
|
||||
ratioBuckets)
|
||||
|
||||
// mmap metrics
|
||||
std::map<std::string, std::string> mmapAllocatedSpaceAnonLabel = {
|
||||
{"type", "anon"}};
|
||||
|
@ -193,4 +218,4 @@ DEFINE_PROMETHEUS_GAUGE(internal_mmap_in_used_space_bytes_anon,
|
|||
DEFINE_PROMETHEUS_GAUGE(internal_mmap_in_used_space_bytes_file,
|
||||
internal_mmap_in_used_space_bytes,
|
||||
mmapAllocatedSpaceFileLabel)
|
||||
} // namespace milvus::storage
|
||||
} // namespace milvus::monitor
|
|
@ -23,7 +23,7 @@
|
|||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
namespace milvus::storage {
|
||||
namespace milvus::monitor {
|
||||
|
||||
class PrometheusClient {
|
||||
public:
|
||||
|
@ -58,24 +58,24 @@ extern const std::unique_ptr<PrometheusClient> prometheusClient;
|
|||
#define DEFINE_PROMETHEUS_GAUGE_FAMILY(name, desc) \
|
||||
prometheus::Family<prometheus::Gauge>& name##_family = \
|
||||
prometheus::BuildGauge().Name(#name).Help(desc).Register( \
|
||||
milvus::storage::prometheusClient->GetRegistry());
|
||||
milvus::monitor::prometheusClient->GetRegistry());
|
||||
#define DEFINE_PROMETHEUS_GAUGE(alias, name, labels) \
|
||||
prometheus::Gauge& alias = name##_family.Add(labels);
|
||||
|
||||
#define DEFINE_PROMETHEUS_COUNTER_FAMILY(name, desc) \
|
||||
prometheus::Family<prometheus::Counter>& name##_family = \
|
||||
prometheus::BuildCounter().Name(#name).Help(desc).Register( \
|
||||
milvus::storage::prometheusClient->GetRegistry());
|
||||
milvus::monitor::prometheusClient->GetRegistry());
|
||||
#define DEFINE_PROMETHEUS_COUNTER(alias, name, labels) \
|
||||
prometheus::Counter& alias = name##_family.Add(labels);
|
||||
|
||||
#define DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(name, desc) \
|
||||
prometheus::Family<prometheus::Histogram>& name##_family = \
|
||||
prometheus::BuildHistogram().Name(#name).Help(desc).Register( \
|
||||
milvus::storage::prometheusClient->GetRegistry());
|
||||
milvus::monitor::prometheusClient->GetRegistry());
|
||||
#define DEFINE_PROMETHEUS_HISTOGRAM(alias, name, labels) \
|
||||
prometheus::Histogram& alias = \
|
||||
name##_family.Add(labels, milvus::storage::buckets);
|
||||
name##_family.Add(labels, milvus::monitor::buckets);
|
||||
#define DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS(alias, name, labels, buckets) \
|
||||
prometheus::Histogram& alias = name##_family.Add(labels, buckets);
|
||||
|
||||
|
@ -128,4 +128,10 @@ DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_mmap_in_used_space_bytes);
|
|||
DECLARE_PROMETHEUS_GAUGE(internal_mmap_in_used_space_bytes_anon);
|
||||
DECLARE_PROMETHEUS_GAUGE(internal_mmap_in_used_space_bytes_file);
|
||||
|
||||
} // namespace milvus::storage
|
||||
// search metrics
|
||||
DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_vector);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar_proportion);
|
||||
|
||||
} // namespace milvus::monitor
|
|
@ -31,4 +31,4 @@ set(MILVUS_QUERY_SRCS
|
|||
)
|
||||
add_library(milvus_query ${MILVUS_QUERY_SRCS})
|
||||
|
||||
target_link_libraries(milvus_query milvus_index milvus_bitset)
|
||||
target_link_libraries(milvus_query milvus_index milvus_bitset milvus_monitor)
|
||||
|
|
|
@ -164,6 +164,8 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
|||
return;
|
||||
}
|
||||
|
||||
std::chrono::high_resolution_clock::time_point scalar_start =
|
||||
std::chrono::high_resolution_clock::now();
|
||||
std::unique_ptr<BitsetType> bitset_holder;
|
||||
if (node.filter_plannode_.has_value()) {
|
||||
BitsetType expr_res;
|
||||
|
@ -177,6 +179,12 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
|||
segment->mask_with_timestamps(*bitset_holder, timestamp_);
|
||||
|
||||
segment->mask_with_delete(*bitset_holder, active_count, timestamp_);
|
||||
std::chrono::high_resolution_clock::time_point scalar_end =
|
||||
std::chrono::high_resolution_clock::now();
|
||||
double scalar_cost =
|
||||
std::chrono::duration<double, std::micro>(scalar_end - scalar_start)
|
||||
.count();
|
||||
monitor::internal_core_search_latency_scalar.Observe(scalar_cost);
|
||||
|
||||
// if bitset_holder is all 1's, we got empty result
|
||||
if (bitset_holder->all()) {
|
||||
|
@ -184,6 +192,9 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
|||
empty_search_result(num_queries, node.search_info_);
|
||||
return;
|
||||
}
|
||||
|
||||
std::chrono::high_resolution_clock::time_point vector_start =
|
||||
std::chrono::high_resolution_clock::now();
|
||||
BitsetView final_view = *bitset_holder;
|
||||
segment->vector_search(node.search_info_,
|
||||
src_data,
|
||||
|
@ -215,6 +226,20 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
|||
search_result.seg_offsets_.size());
|
||||
}
|
||||
search_result_opt_ = std::move(search_result);
|
||||
std::chrono::high_resolution_clock::time_point vector_end =
|
||||
std::chrono::high_resolution_clock::now();
|
||||
double vector_cost =
|
||||
std::chrono::duration<double, std::micro>(vector_end - vector_start)
|
||||
.count();
|
||||
monitor::internal_core_search_latency_vector.Observe(vector_cost);
|
||||
|
||||
double total_cost =
|
||||
std::chrono::duration<double, std::micro>(vector_end - scalar_start)
|
||||
.count();
|
||||
double scalar_ratio =
|
||||
total_cost > 0.0 ? scalar_cost / total_cost : 0.0;
|
||||
monitor::internal_core_search_latency_scalar_proportion.Observe(
|
||||
scalar_ratio);
|
||||
}
|
||||
|
||||
std::unique_ptr<RetrieveResult>
|
||||
|
|
|
@ -20,8 +20,8 @@
|
|||
#include "common/Consts.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "log/Log.h"
|
||||
#include "monitor/prometheus_client.h"
|
||||
#include "storage/AzureChunkManager.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace storage {
|
||||
|
@ -175,13 +175,13 @@ AzureChunkManager::ObjectExists(const std::string& bucket_name,
|
|||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->ObjectExists(bucket_name, object_name);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
monitor::internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
monitor::internal_storage_op_count_stat_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
monitor::internal_storage_op_count_stat_fail.Increment();
|
||||
ThrowAzureError("ObjectExists",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -198,13 +198,13 @@ AzureChunkManager::GetObjectSize(const std::string& bucket_name,
|
|||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->GetObjectSize(bucket_name, object_name);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
monitor::internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
monitor::internal_storage_op_count_stat_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
monitor::internal_storage_op_count_stat_fail.Increment();
|
||||
ThrowAzureError("GetObjectSize",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -221,13 +221,13 @@ AzureChunkManager::DeleteObject(const std::string& bucket_name,
|
|||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->DeleteObject(bucket_name, object_name);
|
||||
internal_storage_request_latency_remove.Observe(
|
||||
monitor::internal_storage_request_latency_remove.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_remove_suc.Increment();
|
||||
monitor::internal_storage_op_count_remove_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_remove_fail.Increment();
|
||||
monitor::internal_storage_op_count_remove_fail.Increment();
|
||||
ThrowAzureError("DeleteObject",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -246,14 +246,14 @@ AzureChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->PutObjectBuffer(bucket_name, object_name, buf, size);
|
||||
internal_storage_request_latency_put.Observe(
|
||||
monitor::internal_storage_request_latency_put.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_put_suc.Increment();
|
||||
internal_storage_kv_size_put.Observe(size);
|
||||
monitor::internal_storage_op_count_put_suc.Increment();
|
||||
monitor::internal_storage_kv_size_put.Observe(size);
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_put_fail.Increment();
|
||||
monitor::internal_storage_op_count_put_fail.Increment();
|
||||
ThrowAzureError("PutObjectBuffer",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -272,14 +272,14 @@ AzureChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->GetObjectBuffer(bucket_name, object_name, buf, size);
|
||||
internal_storage_request_latency_get.Observe(
|
||||
monitor::internal_storage_request_latency_get.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_get_suc.Increment();
|
||||
internal_storage_kv_size_get.Observe(size);
|
||||
monitor::internal_storage_op_count_get_suc.Increment();
|
||||
monitor::internal_storage_kv_size_get.Observe(size);
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_get_fail.Increment();
|
||||
monitor::internal_storage_op_count_get_fail.Increment();
|
||||
ThrowAzureError("GetObjectBuffer",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -296,13 +296,13 @@ AzureChunkManager::ListObjects(const std::string& bucket_name,
|
|||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->ListObjects(bucket_name, prefix);
|
||||
internal_storage_request_latency_list.Observe(
|
||||
monitor::internal_storage_request_latency_list.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_list_suc.Increment();
|
||||
monitor::internal_storage_op_count_list_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_list_fail.Increment();
|
||||
monitor::internal_storage_op_count_list_fail.Increment();
|
||||
ThrowAzureError("ListObjects",
|
||||
err,
|
||||
"params, bucket={}, prefix={}",
|
||||
|
|
|
@ -44,7 +44,6 @@ set(STORAGE_FILES
|
|||
InsertData.cpp
|
||||
Event.cpp
|
||||
ThreadPool.cpp
|
||||
prometheus_client.cpp
|
||||
storage_c.cpp
|
||||
ChunkManager.cpp
|
||||
MinioChunkManager.cpp
|
||||
|
@ -71,6 +70,7 @@ if (DEFINED AZURE_BUILD_DIR)
|
|||
blob-chunk-manager
|
||||
milvus_common
|
||||
milvus-storage
|
||||
milvus_monitor
|
||||
pthread
|
||||
${CONAN_LIBS}
|
||||
)
|
||||
|
@ -78,6 +78,7 @@ else ()
|
|||
target_link_libraries(milvus_storage PUBLIC
|
||||
milvus_common
|
||||
milvus-storage
|
||||
milvus_monitor
|
||||
pthread
|
||||
${CONAN_LIBS}
|
||||
)
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
#include "storage/AliyunCredentialsProvider.h"
|
||||
#include "storage/TencentCloudSTSClient.h"
|
||||
#include "storage/TencentCloudCredentialsProvider.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "monitor/prometheus_client.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "log/Log.h"
|
||||
#include "signal.h"
|
||||
|
@ -484,7 +484,7 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name,
|
|||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->HeadObject(request);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
monitor::internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
|
@ -492,17 +492,17 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name,
|
|||
if (!outcome.IsSuccess()) {
|
||||
const auto& err = outcome.GetError();
|
||||
if (!IsNotFound(err.GetErrorType())) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
monitor::internal_storage_op_count_stat_fail.Increment();
|
||||
ThrowS3Error("ObjectExists",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
monitor::internal_storage_op_count_stat_suc.Increment();
|
||||
return false;
|
||||
}
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
monitor::internal_storage_op_count_stat_suc.Increment();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -515,12 +515,12 @@ MinioChunkManager::GetObjectSize(const std::string& bucket_name,
|
|||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->HeadObject(request);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
monitor::internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
monitor::internal_storage_op_count_stat_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("GetObjectSize",
|
||||
err,
|
||||
|
@ -528,7 +528,7 @@ MinioChunkManager::GetObjectSize(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
monitor::internal_storage_op_count_stat_suc.Increment();
|
||||
return outcome.GetResult().GetContentLength();
|
||||
}
|
||||
|
||||
|
@ -541,7 +541,7 @@ MinioChunkManager::DeleteObject(const std::string& bucket_name,
|
|||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->DeleteObject(request);
|
||||
internal_storage_request_latency_remove.Observe(
|
||||
monitor::internal_storage_request_latency_remove.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
|
@ -549,17 +549,17 @@ MinioChunkManager::DeleteObject(const std::string& bucket_name,
|
|||
if (!outcome.IsSuccess()) {
|
||||
const auto& err = outcome.GetError();
|
||||
if (!IsNotFound(err.GetErrorType())) {
|
||||
internal_storage_op_count_remove_fail.Increment();
|
||||
monitor::internal_storage_op_count_remove_fail.Increment();
|
||||
ThrowS3Error("DeleteObject",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_remove_suc.Increment();
|
||||
monitor::internal_storage_op_count_remove_suc.Increment();
|
||||
return false;
|
||||
}
|
||||
internal_storage_op_count_remove_suc.Increment();
|
||||
monitor::internal_storage_op_count_remove_suc.Increment();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -580,14 +580,14 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->PutObject(request);
|
||||
internal_storage_request_latency_put.Observe(
|
||||
monitor::internal_storage_request_latency_put.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_kv_size_put.Observe(size);
|
||||
monitor::internal_storage_kv_size_put.Observe(size);
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_put_fail.Increment();
|
||||
monitor::internal_storage_op_count_put_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("PutObjectBuffer",
|
||||
err,
|
||||
|
@ -595,7 +595,7 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_put_suc.Increment();
|
||||
monitor::internal_storage_op_count_put_suc.Increment();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -661,14 +661,14 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
});
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->GetObject(request);
|
||||
internal_storage_request_latency_get.Observe(
|
||||
monitor::internal_storage_request_latency_get.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_kv_size_get.Observe(size);
|
||||
monitor::internal_storage_kv_size_get.Observe(size);
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_get_fail.Increment();
|
||||
monitor::internal_storage_op_count_get_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("GetObjectBuffer",
|
||||
err,
|
||||
|
@ -676,7 +676,7 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_get_suc.Increment();
|
||||
monitor::internal_storage_op_count_get_suc.Increment();
|
||||
return size;
|
||||
}
|
||||
|
||||
|
@ -692,13 +692,13 @@ MinioChunkManager::ListObjects(const std::string& bucket_name,
|
|||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->ListObjects(request);
|
||||
internal_storage_request_latency_list.Observe(
|
||||
monitor::internal_storage_request_latency_list.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_list_fail.Increment();
|
||||
monitor::internal_storage_op_count_list_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("ListObjects",
|
||||
err,
|
||||
|
@ -706,7 +706,7 @@ MinioChunkManager::ListObjects(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
prefix);
|
||||
}
|
||||
internal_storage_op_count_list_suc.Increment();
|
||||
monitor::internal_storage_op_count_list_suc.Increment();
|
||||
auto objects = outcome.GetResult().GetContents();
|
||||
for (auto& obj : objects) {
|
||||
objects_vec.emplace_back(obj.GetKey());
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#include "stdio.h"
|
||||
#include <fcntl.h>
|
||||
#include "log/Log.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "monitor/prometheus_client.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
namespace {
|
||||
|
@ -69,9 +69,9 @@ MmapBlock::Init() {
|
|||
offset_.store(0);
|
||||
close(fd);
|
||||
|
||||
milvus::storage::internal_mmap_allocated_space_bytes_file.Observe(
|
||||
milvus::monitor::internal_mmap_allocated_space_bytes_file.Observe(
|
||||
file_size_);
|
||||
milvus::storage::internal_mmap_in_used_space_bytes_file.Increment(
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Increment(
|
||||
file_size_);
|
||||
is_valid_ = true;
|
||||
allocated_size_.fetch_add(file_size_);
|
||||
|
@ -96,7 +96,7 @@ MmapBlock::Close() {
|
|||
}
|
||||
}
|
||||
allocated_size_.fetch_sub(file_size_);
|
||||
milvus::storage::internal_mmap_in_used_space_bytes_file.Decrement(
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Decrement(
|
||||
file_size_);
|
||||
is_valid_ = false;
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
// limitations under the License.
|
||||
|
||||
#include "storage/storage_c.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "monitor/prometheus_client.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/MmapManager.h"
|
||||
|
@ -106,13 +106,3 @@ void
|
|||
CleanRemoteChunkManagerSingleton() {
|
||||
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Release();
|
||||
}
|
||||
|
||||
char*
|
||||
GetStorageMetrics() {
|
||||
auto str = milvus::storage::prometheusClient->GetMetrics();
|
||||
auto len = str.length();
|
||||
char* res = (char*)malloc(len + 1);
|
||||
memcpy(res, str.data(), len);
|
||||
res[len] = '\0';
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -36,9 +36,6 @@ InitMmapManager(CMmapConfig c_mmap_config);
|
|||
void
|
||||
CleanRemoteChunkManagerSingleton();
|
||||
|
||||
char*
|
||||
GetStorageMetrics();
|
||||
|
||||
#ifdef __cplusplus
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -71,6 +71,7 @@ set(MILVUS_TEST_FILES
|
|||
test_array_inverted_index.cpp
|
||||
test_chunk_vector.cpp
|
||||
test_mmap_chunk_manager.cpp
|
||||
test_monitor.cpp
|
||||
)
|
||||
|
||||
if ( INDEX_ENGINE STREQUAL "cardinal" )
|
||||
|
@ -128,6 +129,7 @@ if (LINUX)
|
|||
target_link_libraries(index_builder_test
|
||||
gtest
|
||||
gtest_main
|
||||
milvus_monitor
|
||||
milvus_segcore
|
||||
milvus_storage
|
||||
milvus_indexbuilder
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "monitor/monitor_c.h"
|
||||
#include "monitor/prometheus_client.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
vector<string>
|
||||
split(const string& str,
|
||||
const string& delim) { //将分割后的子字符串存储在vector中
|
||||
vector<string> res;
|
||||
if ("" == str)
|
||||
return res;
|
||||
|
||||
string strs = str + delim;
|
||||
size_t pos;
|
||||
size_t size = strs.size();
|
||||
|
||||
for (int i = 0; i < size; ++i) {
|
||||
pos = strs.find(delim, i);
|
||||
if (pos < size) {
|
||||
string s = strs.substr(i, pos - i);
|
||||
res.push_back(s);
|
||||
i = pos + delim.size() - 1;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
class MonitorTest : public testing::Test {
|
||||
public:
|
||||
MonitorTest() {
|
||||
}
|
||||
~MonitorTest() {
|
||||
}
|
||||
virtual void
|
||||
SetUp() {
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(MonitorTest, GetCoreMetrics) {
|
||||
auto metricsChars = GetCoreMetrics();
|
||||
string helpPrefix = "# HELP ";
|
||||
string familyName = "";
|
||||
char* p;
|
||||
const char* delim = "\n";
|
||||
p = strtok(metricsChars, delim);
|
||||
while (p) {
|
||||
char* currentLine = p;
|
||||
p = strtok(NULL, delim);
|
||||
if (strncmp(currentLine, "# HELP ", 7) == 0) {
|
||||
familyName = "";
|
||||
continue;
|
||||
} else if (strncmp(currentLine, "# TYPE ", 7) == 0) {
|
||||
std::vector<string> res = split(currentLine, " ");
|
||||
EXPECT_EQ(4, res.size());
|
||||
familyName = res[2];
|
||||
EXPECT_EQ(true,
|
||||
res[3] == "gauge" || res[3] == "counter" ||
|
||||
res[3] == "histogram");
|
||||
continue;
|
||||
}
|
||||
EXPECT_EQ(true, familyName.length() > 0);
|
||||
EXPECT_EQ(
|
||||
0, strncmp(currentLine, familyName.c_str(), familyName.length()));
|
||||
}
|
||||
}
|
|
@ -16,7 +16,6 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
#include "common/EasyAssert.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "storage/storage_c.h"
|
||||
|
@ -105,54 +104,4 @@ TEST_F(StorageTest, InitChunkCacheSingleton) {
|
|||
|
||||
TEST_F(StorageTest, CleanRemoteChunkManagerSingleton) {
|
||||
CleanRemoteChunkManagerSingleton();
|
||||
}
|
||||
|
||||
vector<string>
|
||||
split(const string& str,
|
||||
const string& delim) { //将分割后的子字符串存储在vector中
|
||||
vector<string> res;
|
||||
if ("" == str)
|
||||
return res;
|
||||
|
||||
string strs = str + delim;
|
||||
size_t pos;
|
||||
size_t size = strs.size();
|
||||
|
||||
for (int i = 0; i < size; ++i) {
|
||||
pos = strs.find(delim, i);
|
||||
if (pos < size) {
|
||||
string s = strs.substr(i, pos - i);
|
||||
res.push_back(s);
|
||||
i = pos + delim.size() - 1;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
TEST_F(StorageTest, GetStorageMetrics) {
|
||||
auto metricsChars = GetStorageMetrics();
|
||||
string helpPrefix = "# HELP ";
|
||||
string familyName = "";
|
||||
char* p;
|
||||
const char* delim = "\n";
|
||||
p = strtok(metricsChars, delim);
|
||||
while (p) {
|
||||
char* currentLine = p;
|
||||
p = strtok(NULL, delim);
|
||||
if (strncmp(currentLine, "# HELP ", 7) == 0) {
|
||||
familyName = "";
|
||||
continue;
|
||||
} else if (strncmp(currentLine, "# TYPE ", 7) == 0) {
|
||||
std::vector<string> res = split(currentLine, " ");
|
||||
EXPECT_EQ(4, res.size());
|
||||
familyName = res[2];
|
||||
EXPECT_EQ(true,
|
||||
res[3] == "gauge" || res[3] == "counter" ||
|
||||
res[3] == "histogram");
|
||||
continue;
|
||||
}
|
||||
EXPECT_EQ(true, familyName.length() > 0);
|
||||
EXPECT_EQ(
|
||||
0, strncmp(currentLine, familyName.c_str(), familyName.length()));
|
||||
}
|
||||
}
|
|
@ -19,11 +19,11 @@
|
|||
package metrics
|
||||
|
||||
/*
|
||||
#cgo pkg-config: milvus_segcore milvus_storage milvus_common
|
||||
#cgo pkg-config: milvus_segcore milvus_storage milvus_common milvus_monitor
|
||||
|
||||
#include <stdlib.h>
|
||||
#include "segcore/metrics_c.h"
|
||||
#include "storage/storage_c.h"
|
||||
#include "monitor/monitor_c.h"
|
||||
|
||||
*/
|
||||
import "C"
|
||||
|
@ -135,7 +135,7 @@ func (r *CRegistry) Gather() (res []*dto.MetricFamily, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
cMetricsStr = C.GetStorageMetrics()
|
||||
cMetricsStr = C.GetCoreMetrics()
|
||||
metricsStr = C.GoString(cMetricsStr)
|
||||
C.free(unsafe.Pointer(cMetricsStr))
|
||||
|
||||
|
|
Loading…
Reference in New Issue