mirror of https://github.com/milvus-io/milvus.git
add internal storage metrics (#28278)
/kind improvement issue: #28277 Signed-off-by: PowderLi <min.li@zilliz.com>pull/28560/head
parent
7029797452
commit
a1c505dbd5
File diff suppressed because it is too large
Load Diff
|
@ -14,12 +14,14 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include "common/Consts.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "log/Log.h"
|
||||
#include "storage/AzureChunkManager.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace storage {
|
||||
|
@ -170,8 +172,15 @@ AzureChunkManager::ObjectExists(const std::string& bucket_name,
|
|||
const std::string& object_name) {
|
||||
bool res;
|
||||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->ObjectExists(bucket_name, object_name);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
ThrowAzureError("ObjectExists",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -186,8 +195,15 @@ AzureChunkManager::GetObjectSize(const std::string& bucket_name,
|
|||
const std::string& object_name) {
|
||||
uint64_t res;
|
||||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->GetObjectSize(bucket_name, object_name);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
ThrowAzureError("GetObjectSize",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -202,8 +218,15 @@ AzureChunkManager::DeleteObject(const std::string& bucket_name,
|
|||
const std::string& object_name) {
|
||||
bool res;
|
||||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->DeleteObject(bucket_name, object_name);
|
||||
internal_storage_request_latency_remove.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_remove_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_remove_fail.Increment();
|
||||
ThrowAzureError("DeleteObject",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -220,8 +243,16 @@ AzureChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
uint64_t size) {
|
||||
bool res;
|
||||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->PutObjectBuffer(bucket_name, object_name, buf, size);
|
||||
internal_storage_request_latency_put.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_put_suc.Increment();
|
||||
internal_storage_kv_size_put.Observe(size);
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_put_fail.Increment();
|
||||
ThrowAzureError("PutObjectBuffer",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -238,8 +269,16 @@ AzureChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
uint64_t size) {
|
||||
uint64_t res;
|
||||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->GetObjectBuffer(bucket_name, object_name, buf, size);
|
||||
internal_storage_request_latency_get.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_get_suc.Increment();
|
||||
internal_storage_kv_size_get.Observe(size);
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_get_fail.Increment();
|
||||
ThrowAzureError("GetObjectBuffer",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
|
@ -254,8 +293,15 @@ AzureChunkManager::ListObjects(const std::string& bucket_name,
|
|||
const std::string& prefix) {
|
||||
std::vector<std::string> res;
|
||||
try {
|
||||
auto start = std::chrono::system_clock::now();
|
||||
res = client_->ListObjects(bucket_name, prefix);
|
||||
internal_storage_request_latency_list.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_op_count_list_suc.Increment();
|
||||
} catch (std::exception& err) {
|
||||
internal_storage_op_count_list_fail.Increment();
|
||||
ThrowAzureError("ListObjects",
|
||||
err,
|
||||
"params, bucket={}, prefix={}",
|
||||
|
|
|
@ -46,6 +46,7 @@ set(STORAGE_FILES
|
|||
InsertData.cpp
|
||||
Event.cpp
|
||||
ThreadPool.cpp
|
||||
prometheus_client.cpp
|
||||
storage_c.cpp
|
||||
ChunkManager.cpp
|
||||
MinioChunkManager.cpp
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "storage/MinioChunkManager.h"
|
||||
#include "storage/AliyunSTSClient.h"
|
||||
#include "storage/AliyunCredentialsProvider.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "log/Log.h"
|
||||
#include "signal.h"
|
||||
|
@ -445,19 +446,27 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name,
|
|||
request.SetBucket(bucket_name.c_str());
|
||||
request.SetKey(object_name.c_str());
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->HeadObject(request);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
const auto& err = outcome.GetError();
|
||||
if (!IsNotFound(err.GetErrorType())) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
ThrowS3Error("ObjectExists",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
return false;
|
||||
}
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -468,8 +477,14 @@ MinioChunkManager::GetObjectSize(const std::string& bucket_name,
|
|||
request.SetBucket(bucket_name.c_str());
|
||||
request.SetKey(object_name.c_str());
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->HeadObject(request);
|
||||
internal_storage_request_latency_stat.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_stat_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("GetObjectSize",
|
||||
err,
|
||||
|
@ -477,6 +492,7 @@ MinioChunkManager::GetObjectSize(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_stat_suc.Increment();
|
||||
return outcome.GetResult().GetContentLength();
|
||||
}
|
||||
|
||||
|
@ -487,19 +503,27 @@ MinioChunkManager::DeleteObject(const std::string& bucket_name,
|
|||
request.SetBucket(bucket_name.c_str());
|
||||
request.SetKey(object_name.c_str());
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->DeleteObject(request);
|
||||
internal_storage_request_latency_remove.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
const auto& err = outcome.GetError();
|
||||
if (!IsNotFound(err.GetErrorType())) {
|
||||
internal_storage_op_count_remove_fail.Increment();
|
||||
ThrowS3Error("DeleteObject",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_remove_suc.Increment();
|
||||
return false;
|
||||
}
|
||||
internal_storage_op_count_remove_suc.Increment();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -518,9 +542,16 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
input_data->write(reinterpret_cast<char*>(buf), size);
|
||||
request.SetBody(input_data);
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->PutObject(request);
|
||||
internal_storage_request_latency_put.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_kv_size_put.Observe(size);
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_put_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("PutObjectBuffer",
|
||||
err,
|
||||
|
@ -528,6 +559,7 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_put_suc.Increment();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -591,9 +623,16 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
#endif
|
||||
return stream.release();
|
||||
});
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->GetObject(request);
|
||||
internal_storage_request_latency_get.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
internal_storage_kv_size_get.Observe(size);
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_get_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("GetObjectBuffer",
|
||||
err,
|
||||
|
@ -601,6 +640,7 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
internal_storage_op_count_get_suc.Increment();
|
||||
return size;
|
||||
}
|
||||
|
||||
|
@ -614,9 +654,15 @@ MinioChunkManager::ListObjects(const std::string& bucket_name,
|
|||
request.SetPrefix(prefix);
|
||||
}
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto outcome = client_->ListObjects(request);
|
||||
internal_storage_request_latency_list.Observe(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count());
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
internal_storage_op_count_list_fail.Increment();
|
||||
const auto& err = outcome.GetError();
|
||||
ThrowS3Error("ListObjects",
|
||||
err,
|
||||
|
@ -624,6 +670,7 @@ MinioChunkManager::ListObjects(const std::string& bucket_name,
|
|||
bucket_name,
|
||||
prefix);
|
||||
}
|
||||
internal_storage_op_count_list_suc.Increment();
|
||||
auto objects = outcome.GetResult().GetContents();
|
||||
for (auto& obj : objects) {
|
||||
objects_vec.emplace_back(obj.GetKey());
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
// Copyright (C) 2019-2023 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "storage/prometheus_client.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
const prometheus::Histogram::BucketBoundaries buckets = {1,
|
||||
2,
|
||||
4,
|
||||
8,
|
||||
16,
|
||||
32,
|
||||
64,
|
||||
128,
|
||||
256,
|
||||
512,
|
||||
1024,
|
||||
2048,
|
||||
4096,
|
||||
8192,
|
||||
16384,
|
||||
32768,
|
||||
65536};
|
||||
|
||||
const std::unique_ptr<PrometheusClient> prometheusClient =
|
||||
std::make_unique<PrometheusClient>();
|
||||
|
||||
/******************GetMetrics*************************************************************
|
||||
* !!! NOT use SUMMARY metrics here, because when parse SUMMARY metrics in Milvus,
|
||||
* see following error:
|
||||
*
|
||||
* An error has occurred while serving metrics:
|
||||
* text format parsing error in line 50: expected float as value, got "=\"0.9\"}"
|
||||
******************************************************************************/
|
||||
|
||||
std::map<std::string, std::string> getMap = {
|
||||
{"persistent_data_op_type", "get"}};
|
||||
std::map<std::string, std::string> getSucMap = {
|
||||
{"persistent_data_op_type", "get"}, {"status", "success"}};
|
||||
std::map<std::string, std::string> getFailMap = {
|
||||
{"persistent_data_op_type", "get"}};
|
||||
std::map<std::string, std::string> putMap = {
|
||||
{"persistent_data_op_type", "put"}};
|
||||
std::map<std::string, std::string> putSucMap = {
|
||||
{"persistent_data_op_type", "put"}, {"status", "success"}};
|
||||
std::map<std::string, std::string> putFailMap = {
|
||||
{"persistent_data_op_type", "put"}, {"status", "fail"}};
|
||||
std::map<std::string, std::string> statMap = {
|
||||
{"persistent_data_op_type", "stat"}};
|
||||
std::map<std::string, std::string> statSucMap = {
|
||||
{"persistent_data_op_type", "stat"}, {"status", "success"}};
|
||||
std::map<std::string, std::string> statFailMap = {
|
||||
{"persistent_data_op_type", "stat"}, {"status", "fail"}};
|
||||
std::map<std::string, std::string> listMap = {
|
||||
{"persistent_data_op_type", "list"}};
|
||||
std::map<std::string, std::string> listSucMap = {
|
||||
{"persistent_data_op_type", "list"}, {"status", "success"}};
|
||||
std::map<std::string, std::string> listFailMap = {
|
||||
{"persistent_data_op_type", "list"}, {"status", "fail"}};
|
||||
std::map<std::string, std::string> removeMap = {
|
||||
{"persistent_data_op_type", "remove"}};
|
||||
std::map<std::string, std::string> removeSucMap = {
|
||||
{"persistent_data_op_type", "remove"}, {"status", "success"}};
|
||||
std::map<std::string, std::string> removeFailMap = {
|
||||
{"persistent_data_op_type", "remove"}, {"status", "fail"}};
|
||||
|
||||
DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_storage_kv_size,
|
||||
"[cpp]kv size stats")
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_kv_size_get,
|
||||
internal_storage_kv_size,
|
||||
getMap)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_kv_size_put,
|
||||
internal_storage_kv_size,
|
||||
putMap)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(
|
||||
internal_storage_request_latency,
|
||||
"[cpp]request latency(ms) on the client side")
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_get,
|
||||
internal_storage_request_latency,
|
||||
getMap)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_put,
|
||||
internal_storage_request_latency,
|
||||
putMap)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_stat,
|
||||
internal_storage_request_latency,
|
||||
statMap)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_list,
|
||||
internal_storage_request_latency,
|
||||
listMap)
|
||||
DEFINE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_remove,
|
||||
internal_storage_request_latency,
|
||||
removeMap)
|
||||
DEFINE_PROMETHEUS_COUNTER_FAMILY(internal_storage_op_count,
|
||||
"[cpp]count of persistent data operation")
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_get_suc,
|
||||
internal_storage_op_count,
|
||||
getSucMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_get_fail,
|
||||
internal_storage_op_count,
|
||||
getFailMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_put_suc,
|
||||
internal_storage_op_count,
|
||||
putSucMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_put_fail,
|
||||
internal_storage_op_count,
|
||||
putFailMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_stat_suc,
|
||||
internal_storage_op_count,
|
||||
statSucMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_stat_fail,
|
||||
internal_storage_op_count,
|
||||
statFailMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_list_suc,
|
||||
internal_storage_op_count,
|
||||
listSucMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_list_fail,
|
||||
internal_storage_op_count,
|
||||
listFailMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_remove_suc,
|
||||
internal_storage_op_count,
|
||||
removeSucMap)
|
||||
DEFINE_PROMETHEUS_COUNTER(internal_storage_op_count_remove_fail,
|
||||
internal_storage_op_count,
|
||||
removeFailMap)
|
||||
} // namespace milvus::storage
|
|
@ -0,0 +1,115 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <prometheus/collectable.h>
|
||||
#include <prometheus/counter.h>
|
||||
#include <prometheus/gauge.h>
|
||||
#include <prometheus/histogram.h>
|
||||
#include <prometheus/registry.h>
|
||||
#include <prometheus/summary.h>
|
||||
#include <prometheus/text_serializer.h>
|
||||
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
class PrometheusClient {
|
||||
public:
|
||||
PrometheusClient() = default;
|
||||
PrometheusClient(const PrometheusClient&) = delete;
|
||||
PrometheusClient&
|
||||
operator=(const PrometheusClient&) = delete;
|
||||
|
||||
prometheus::Registry&
|
||||
GetRegistry() {
|
||||
return *registry_;
|
||||
}
|
||||
|
||||
std::string
|
||||
GetMetrics() {
|
||||
std::ostringstream ss;
|
||||
prometheus::TextSerializer serializer;
|
||||
serializer.Serialize(ss, registry_->Collect());
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<prometheus::Registry> registry_ =
|
||||
std::make_shared<prometheus::Registry>();
|
||||
};
|
||||
|
||||
/*****************************************************************************/
|
||||
// prometheus metrics
|
||||
extern const prometheus::Histogram::BucketBoundaries buckets;
|
||||
extern const std::unique_ptr<PrometheusClient> prometheusClient;
|
||||
|
||||
#define DEFINE_PROMETHEUS_GAUGE_FAMILY(name, desc) \
|
||||
prometheus::Family<prometheus::Gauge>& name##_family = \
|
||||
prometheus::BuildGauge().Name(#name).Help(desc).Register( \
|
||||
milvus::storage::prometheusClient->GetRegistry());
|
||||
#define DEFINE_PROMETHEUS_GAUGE(alias, name, labels) \
|
||||
prometheus::Gauge& alias = name##_family.Add(labels);
|
||||
|
||||
#define DEFINE_PROMETHEUS_COUNTER_FAMILY(name, desc) \
|
||||
prometheus::Family<prometheus::Counter>& name##_family = \
|
||||
prometheus::BuildCounter().Name(#name).Help(desc).Register( \
|
||||
milvus::storage::prometheusClient->GetRegistry());
|
||||
#define DEFINE_PROMETHEUS_COUNTER(alias, name, labels) \
|
||||
prometheus::Counter& alias = name##_family.Add(labels);
|
||||
|
||||
#define DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(name, desc) \
|
||||
prometheus::Family<prometheus::Histogram>& name##_family = \
|
||||
prometheus::BuildHistogram().Name(#name).Help(desc).Register( \
|
||||
milvus::storage::prometheusClient->GetRegistry());
|
||||
#define DEFINE_PROMETHEUS_HISTOGRAM(alias, name, labels) \
|
||||
prometheus::Histogram& alias = \
|
||||
name##_family.Add(labels, milvus::storage::buckets);
|
||||
|
||||
#define DECLARE_PROMETHEUS_GAUGE_FAMILY(name_gauge_family) \
|
||||
extern prometheus::Family<prometheus::Gauge>& name_gauge_family;
|
||||
#define DECLARE_PROMETHEUS_GAUGE(name_gauge) \
|
||||
extern prometheus::Gauge& name_gauge;
|
||||
#define DECLARE_PROMETHEUS_COUNTER_FAMILY(name_counter_family) \
|
||||
extern prometheus::Family<prometheus::Counter>& name_counter_family;
|
||||
#define DECLARE_PROMETHEUS_COUNTER(name_counter) \
|
||||
extern prometheus::Counter& name_counter;
|
||||
#define DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(name_histogram_family) \
|
||||
extern prometheus::Family<prometheus::Histogram>& name_histogram_family;
|
||||
#define DECLARE_PROMETHEUS_HISTOGRAM(name_histogram) \
|
||||
extern prometheus::Histogram& name_histogram;
|
||||
|
||||
DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_storage_kv_size);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_storage_kv_size_get);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_storage_kv_size_put);
|
||||
|
||||
DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_storage_request_latency);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_get);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_put);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_stat);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_list);
|
||||
DECLARE_PROMETHEUS_HISTOGRAM(internal_storage_request_latency_remove);
|
||||
|
||||
DECLARE_PROMETHEUS_COUNTER_FAMILY(internal_storage_op_count);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_get_suc);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_get_fail);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_put_suc);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_put_fail);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_stat_suc);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_stat_fail);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_list_suc);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_list_fail);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_remove_suc);
|
||||
DECLARE_PROMETHEUS_COUNTER(internal_storage_op_count_remove_fail);
|
||||
} // namespace milvus::storage
|
|
@ -15,6 +15,7 @@
|
|||
// limitations under the License.
|
||||
|
||||
#include "storage/storage_c.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/ChunkCacheSingleton.h"
|
||||
|
@ -98,3 +99,13 @@ void
|
|||
CleanRemoteChunkManagerSingleton() {
|
||||
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Release();
|
||||
}
|
||||
|
||||
char*
|
||||
GetStorageMetrics() {
|
||||
auto str = milvus::storage::prometheusClient->GetMetrics();
|
||||
auto len = str.length();
|
||||
char* res = (char*)malloc(len + 1);
|
||||
memcpy(res, str.data(), len);
|
||||
res[len] = '\0';
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -36,6 +36,9 @@ InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy);
|
|||
void
|
||||
CleanRemoteChunkManagerSingleton();
|
||||
|
||||
char*
|
||||
GetStorageMetrics();
|
||||
|
||||
#ifdef __cplusplus
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -58,6 +58,7 @@ set(MILVUS_TEST_FILES
|
|||
test_plan_proto.cpp
|
||||
test_chunk_cache.cpp
|
||||
test_binlog_index.cpp
|
||||
test_storage.cpp
|
||||
)
|
||||
|
||||
if ( BUILD_DISK_ANN STREQUAL "ON" )
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "common/EasyAssert.h"
|
||||
#include "storage/prometheus_client.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "storage/storage_c.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace milvus;
|
||||
using namespace milvus::storage;
|
||||
|
||||
string rootPath = "files";
|
||||
string bucketName = "a-bucket";
|
||||
|
||||
CStorageConfig
|
||||
get_azure_storage_config() {
|
||||
auto endpoint = "core.windows.net";
|
||||
auto accessKey = "devstoreaccount1";
|
||||
auto accessValue =
|
||||
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/"
|
||||
"K1SZFPTOtr/KBHBeksoGMGw==";
|
||||
|
||||
return CStorageConfig{endpoint,
|
||||
bucketName.c_str(),
|
||||
accessKey,
|
||||
accessValue,
|
||||
rootPath.c_str(),
|
||||
"remote",
|
||||
"azure",
|
||||
"",
|
||||
"error",
|
||||
"",
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
30000};
|
||||
}
|
||||
|
||||
class StorageTest : public testing::Test {
|
||||
public:
|
||||
StorageTest() {
|
||||
}
|
||||
~StorageTest() {
|
||||
}
|
||||
virtual void
|
||||
SetUp() {
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(StorageTest, InitLocalChunkManagerSingleton) {
|
||||
auto status = InitLocalChunkManagerSingleton("tmp");
|
||||
EXPECT_EQ(status.error_code, Success);
|
||||
}
|
||||
|
||||
TEST_F(StorageTest, GetLocalUsedSize) {
|
||||
int64_t size = 0;
|
||||
auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
EXPECT_EQ(lcm->GetRootPath(), "/tmp/milvus/local_data/");
|
||||
string test_dir = lcm->GetRootPath() + "tmp";
|
||||
string test_file = test_dir + "/test.txt";
|
||||
|
||||
auto status = GetLocalUsedSize(test_dir.c_str(), &size);
|
||||
EXPECT_EQ(status.error_code, Success);
|
||||
EXPECT_EQ(size, 0);
|
||||
lcm->CreateDir(test_dir);
|
||||
lcm->CreateFile(test_file);
|
||||
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
|
||||
lcm->Write(test_file, data, sizeof(data));
|
||||
status = GetLocalUsedSize(test_dir.c_str(), &size);
|
||||
EXPECT_EQ(status.error_code, Success);
|
||||
EXPECT_EQ(size, 5);
|
||||
lcm->RemoveDir(test_dir);
|
||||
}
|
||||
|
||||
TEST_F(StorageTest, InitRemoteChunkManagerSingleton) {
|
||||
CStorageConfig storageConfig = get_azure_storage_config();
|
||||
InitRemoteChunkManagerSingleton(storageConfig);
|
||||
auto rcm =
|
||||
RemoteChunkManagerSingleton::GetInstance().GetRemoteChunkManager();
|
||||
EXPECT_EQ(rcm->GetRootPath(), "/tmp/milvus/remote_data");
|
||||
}
|
||||
|
||||
TEST_F(StorageTest, InitChunkCacheSingleton) {
|
||||
}
|
||||
|
||||
TEST_F(StorageTest, CleanRemoteChunkManagerSingleton) {
|
||||
CleanRemoteChunkManagerSingleton();
|
||||
}
|
||||
|
||||
vector<string>
|
||||
split(const string& str,
|
||||
const string& delim) { //将分割后的子字符串存储在vector中
|
||||
vector<string> res;
|
||||
if ("" == str)
|
||||
return res;
|
||||
|
||||
string strs = str + delim;
|
||||
size_t pos;
|
||||
size_t size = strs.size();
|
||||
|
||||
for (int i = 0; i < size; ++i) {
|
||||
pos = strs.find(delim, i);
|
||||
if (pos < size) {
|
||||
string s = strs.substr(i, pos - i);
|
||||
res.push_back(s);
|
||||
i = pos + delim.size() - 1;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
TEST_F(StorageTest, GetStorageMetrics) {
|
||||
auto metricsChars = GetStorageMetrics();
|
||||
string helpPrefix = "# HELP ";
|
||||
string familyName = "";
|
||||
char* p;
|
||||
const char* delim = "\n";
|
||||
p = strtok(metricsChars, delim);
|
||||
while (p) {
|
||||
char* currentLine = p;
|
||||
p = strtok(NULL, delim);
|
||||
if (strncmp(currentLine, "# HELP ", 7) == 0) {
|
||||
familyName = "";
|
||||
continue;
|
||||
} else if (strncmp(currentLine, "# TYPE ", 7) == 0) {
|
||||
std::vector<string> res = split(currentLine, " ");
|
||||
EXPECT_EQ(4, res.size());
|
||||
familyName = res[2];
|
||||
EXPECT_EQ(true, res[3] == "counter" || res[3] == "histogram");
|
||||
continue;
|
||||
}
|
||||
EXPECT_EQ(true, familyName.length() > 0);
|
||||
EXPECT_EQ(
|
||||
0, strncmp(currentLine, familyName.c_str(), familyName.length()));
|
||||
}
|
||||
}
|
|
@ -4770,7 +4770,10 @@ func TestUpdateAutoBalanceConfigLoop(t *testing.T) {
|
|||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go server.updateBalanceConfigLoop(ctx)
|
||||
go func() {
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
server.updateBalanceConfigLoop(ctx)
|
||||
}()
|
||||
// old data node exist, disable auto balance
|
||||
assert.Eventually(t, func() bool {
|
||||
return !Params.DataCoordCfg.AutoBalance.GetAsBool()
|
||||
|
|
|
@ -23,6 +23,7 @@ package metrics
|
|||
|
||||
#include <stdlib.h>
|
||||
#include "segcore/metrics_c.h"
|
||||
#include "storage/storage_c.h"
|
||||
|
||||
*/
|
||||
import "C"
|
||||
|
@ -37,6 +38,7 @@ import (
|
|||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
@ -129,9 +131,21 @@ func (r *CRegistry) Gather() (res []*dto.MetricFamily, err error) {
|
|||
|
||||
out, err := parser.TextToMetricFamilies(strings.NewReader(metricsStr))
|
||||
if err != nil {
|
||||
log.Error("fail to parse prometheus metrics", zap.Error(err))
|
||||
log.Error("fail to parse knowhere prometheus metrics", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
cMetricsStr = C.GetStorageMetrics()
|
||||
metricsStr = C.GoString(cMetricsStr)
|
||||
C.free(unsafe.Pointer(cMetricsStr))
|
||||
|
||||
out1, err := parser.TextToMetricFamilies(strings.NewReader(metricsStr))
|
||||
if err != nil {
|
||||
log.Error("fail to parse storage prometheus metrics", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
maps.Copy(out, out1)
|
||||
res = NormalizeMetricFamilies(out)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue