Add metrics and put starting process of prometheus and grafana into docker-compose (#4364)

* Add db_info metrics

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Remove logs from metrics endpoint

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Put starting process of prometheus and grafana into docker-compose

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Fix test_segment

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/4377/head^2
yukun 2020-12-07 11:16:13 +08:00 committed by GitHub
parent 0367bf8246
commit d868370c32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 5084 additions and 124 deletions

View File

@ -10,6 +10,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#263 Add new indexes: NGT_PANNG NGT_ONNG
- \#3132 Add new indexes: RHNSW_PQ RHNSW_SQ
- \#3920 Allow an optional parameter `nbits` when creating IVF_PQ index
- \#4363 Put starting process of prometheus and grafana into docker-compose
## Improvement

View File

@ -129,6 +129,7 @@ DBImpl::Start() {
// background build index thread
bg_index_thread_ = std::thread(&DBImpl::TimingIndexThread, this);
}
bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
return Status::OK();
}
@ -156,10 +157,6 @@ DBImpl::Stop() {
index_req_swn_.Notify();
bg_index_thread_.join();
LOG_ENGINE_DEBUG_ << "DBImpl::Stop bg_index_thread_.join()";
}
// wait metric thread exit
if (options_.metric_enable_) {
swn_metric_.Notify();
bg_metric_thread_.join();
}
@ -489,7 +486,10 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
idx_t op_id) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
ScopedTimer scope_timer([data_chunk, this](double latency) {
auto size = utils::GetSizeOfChunk(data_chunk);
this->insert_entities_size_gauge_.Set(size / latency);
});
if (data_chunk == nullptr) {
return Status(DB_ERROR, "Null pointer");
}
@ -668,7 +668,13 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt
}
auto vector_param = query_ptr->vectors.begin()->second;
auto nq = vector_param->nq;
ScopedTimer scoped_timer([nq, this](double latency) {
for (int64_t i = 0; i < nq; ++i) {
this->query_count_summary_.Observe(latency / nq);
this->query_response_summary_.Observe(latency);
}
});
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));
@ -708,23 +714,6 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt
rc.RecordSection("get entities");
}
// step 5: filter entities by field names
// std::vector<engine::AttrsData> filter_attrs;
// for (auto attr : result.attrs_) {
// AttrsData attrs_data;
// attrs_data.attr_type_ = attr.attr_type_;
// attrs_data.attr_count_ = attr.attr_count_;
// attrs_data.id_array_ = attr.id_array_;
// for (auto& name : field_names) {
// if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
// attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
// }
// }
// filter_attrs.emplace_back(attrs_data);
// }
// tracer.Context()->GetTraceContext()->GetSpan()->Finish();
return Status::OK();
}
@ -1117,6 +1106,25 @@ DBImpl::BackgroundMerge(std::set<int64_t> collection_ids, bool force_merge_all)
}
}
void
DBImpl::BackgroundMetricThread() {
SetThreadName("metric_thread");
while (true) {
if (!ServiceAvailable()) {
LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
break;
}
size_t data_size = 0;
auto status = GetDataSize(data_size);
data_size_gauge_.Set(data_size);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Server get data size failed";
}
swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
}
}
void
DBImpl::WaitMergeFileFinish() {
// LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";

View File

@ -23,7 +23,7 @@
#include "db/DB.h"
#include "db/SegmentTaskTracker.h"
#include "metrics/Prometheus.h"
#include "utils/ThreadPool.h"
#include "value/config/ConfigMgr.h"
@ -155,6 +155,9 @@ class DBImpl : public DB, public ConfigObserver {
void
BackgroundMerge(std::set<int64_t> collection_ids, bool force_merge_all);
void
BackgroundMetricThread();
void
WaitMergeFileFinish();
@ -207,6 +210,17 @@ class DBImpl : public DB, public ConfigObserver {
int64_t live_build_num_ = 0;
std::mutex live_build_count_mutex_;
// Metrics
PROMETHEUS_GAUGE(db_info_family_, data_size_gauge_, "milvus_data_size", "milvus data size");
PROMETHEUS_GAUGE(insert_entities_family_, insert_entities_size_gauge_,
"milvus_insert_entities_throughput_per_microsecond", "insert entities throughput per microsecond");
PROMETHEUS_SUMMARY(query_count_family_, query_count_summary_, "milvus_query_count_summary", "query count summary");
PROMETHEUS_SUMMARY(query_response_family_, query_response_summary_, "milvus_query_response_summary",
"query response summary");
}; // SSDBImpl
using DBImplPtr = std::shared_ptr<DBImpl>;

View File

@ -24,6 +24,27 @@
#include <prometheus/registry.h>
#include <prometheus/text_serializer.h>
using Quantiles = std::vector<prometheus::detail::CKMSQuantiles::Quantile>;
#define PROMETHEUS_GAUGE(name_family, name_gauge, name, description) \
prometheus::Family<prometheus::Gauge>& name_family = \
prometheus::BuildGauge().Name(name).Help(description).Register(prometheus.registry()); \
prometheus::Gauge& name_gauge = name_family.Add({});
#define PROMETHEUS_COUNT(name_family, name_count, name, description) \
prometheus::Family<prometheus::Counter>& name_family = \
prometheus::BuildCounter().Name(name).Help(description).Register(prometheus.registry()); \
prometheus::Counter& rpc_requests_total_counter_ = rpc_requests_total_.Add({});
#define PROMETHEUS_SUMMARY(name_family, name_summary, name, description) \
prometheus::Family<prometheus::Summary>& name_family = \
prometheus::BuildSummary().Name(name).Help(description).Register(prometheus.registry()); \
prometheus::Summary& name_summary = name_family.Add({}, Quantiles{{0.95, 0.00}, {0.9, 0.05}, {0.8, 0.1}});
#define PROMETHEUS_HISTOGRAM(name_family, name, description) \
prometheus::Family<prometheus::Histogram>& name_family = \
prometheus::BuildHistogram().Name(name).Help(description).Register(prometheus.registry());
namespace milvus {
class ScopedTimer {
@ -34,7 +55,7 @@ class ScopedTimer {
~ScopedTimer() {
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(end - start_).count();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start_).count();
callback_(duration);
}

View File

@ -45,6 +45,8 @@ SystemInfoCollector::collector_function() {
SystemInfo::CpuUtilizationRatio(base_cpu_, base_sys_cpu_, base_user_cpu_);
while (running_) {
/* collect metrics */
keeping_alive_counter_.Increment(1);
// cpu_utilization_ratio range: 0~25600%
cpu_utilization_ratio_.Set(cpu_utilization_ratio());

View File

@ -78,6 +78,12 @@ class SystemInfoCollector {
using Family = prometheus::Family<T>;
using Gauge = prometheus::Gauge;
prometheus::Family<prometheus::Counter>& keeping_alive_ = prometheus::BuildCounter()
.Name("milvus_uptime")
.Help("total seconds of the serve alive")
.Register(prometheus.registry());
prometheus::Counter& keeping_alive_counter_ = keeping_alive_.Add({});
/* cpu_utilization_ratio */
Family<Gauge>& cpu_utilization_ratio_family_ = prometheus::BuildGauge()
.Name("milvus_cpu_utilization_ratio")
@ -85,11 +91,11 @@ class SystemInfoCollector {
.Register(prometheus.registry());
Gauge& cpu_utilization_ratio_ = cpu_utilization_ratio_family_.Add({});
Family<Gauge>& cpu_tempearature_family_ = prometheus::BuildGauge()
.Name("milvus_cpu_temperature_celsius")
.Help("cpu_temperature")
.Register(prometheus.registry());
Gauge& cpu_temperature_ = cpu_tempearature_family_.Add({});
Family<Gauge>& cpu_temperature_family_ = prometheus::BuildGauge()
.Name("milvus_cpu_temperature_celsius")
.Help("cpu_temperature")
.Register(prometheus.registry());
Gauge& cpu_temperature_ = cpu_temperature_family_.Add({});
Family<Gauge>& mem_usage_family_ =
prometheus::BuildGauge().Name("milvus_mem_usage").Help("mem_usage").Register(prometheus.registry());

View File

@ -808,6 +808,7 @@ GrpcRequestHandler::DropCollection(::grpc::ServerContext* context, const ::milvu
GrpcRequestHandler::CreateIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::Status* response) {
CHECK_NULLPTR_RETURN(request)
ScopedTimer scoped_timer([this](double lantency) { this->operation_create_index_histogram_.Observe(lantency); });
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
milvus::json json_params;
@ -931,48 +932,6 @@ GrpcRequestHandler::GetEntityIDs(::grpc::ServerContext* context, const ::milvus:
return ::grpc::Status::OK;
}
//::grpc::Status
// GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request,
// ::milvus::grpc::QueryResult* response) {
// CHECK_NULLPTR_RETURN(request);
// LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
//
// // step 1: copy vector data
// engine::VectorsData vectors;
// CopyRowRecords(request->query_record_array(), google::protobuf::RepeatedField<google::protobuf::int64>(),
// vectors);
//
// // step 2: partition tags
// std::vector<std::string> partitions;
// std::copy(request->partition_tag_array().begin(), request->partition_tag_array().end(),
// std::back_inserter(partitions));
//
// // step 3: parse extra parameters
// milvus::json json_params;
// for (int i = 0; i < request->extra_params_size(); i++) {
// const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i);
// if (extra.key() == EXTRA_PARAM_KEY) {
// json_params = json::parse(extra.value());
// }
// }
//
// // step 4: search vectors
// std::vector<std::string> file_ids;
// TopKQueryResult result;
// fiu_do_on("GrpcRequestHandler.Search.not_empty_file_ids", file_ids.emplace_back("test_file_id"));
//
// Status status = req_handler_.Search(GetContext(context), request->collection_name(), vectors, request->topk(),
// json_params, partitions, file_ids, result);
//
// // step 5: construct and return result
// ConstructResults(result, response);
//
// LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
// SET_RESPONSE(response->mutable_status(), status, context);
//
// return ::grpc::Status::OK;
//}
::grpc::Status
GrpcRequestHandler::SearchInSegment(::grpc::ServerContext* context, const ::milvus::grpc::SearchInSegmentParam* request,
::milvus::grpc::QueryResult* response) {
@ -1696,6 +1655,7 @@ GrpcRequestHandler::DeserializeDslToBoolQuery(
CopyRowRecords(vector_param.row_record().records(),
google::protobuf::RepeatedField<google::protobuf::int64>(), vector_data);
vector_query->query_vector.vector_count = vector_data.vector_count_;
vector_query->nq = vector_data.vector_count_;
vector_query->query_vector.binary_data.swap(vector_data.binary_data_);
vector_query->query_vector.float_data.swap(vector_data.float_data_);
@ -1718,7 +1678,7 @@ GrpcRequestHandler::DeserializeDslToBoolQuery(
GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request,
::milvus::grpc::QueryResult* response) {
CHECK_NULLPTR_RETURN(request);
ScopedTimer scoped_timer([this](double lantency) { this->operation_search_histogram_.Observe(lantency); });
ScopedTimer scoped_timer([this](double latency) { this->operation_search_histogram_.Observe(latency); });
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
Status status;

View File

@ -354,21 +354,16 @@ class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service,
int64_t max_concurrent_insert_request_size = 0;
/* prometheus */
prometheus::Family<prometheus::Counter>& rpc_requests_total_ = prometheus::BuildCounter()
.Name("milvus_rpc_requests_total")
.Help("the number of rpc requests")
.Register(prometheus.registry());
PROMETHEUS_COUNT(rpc_requests_total_, rpc_requests_total_counter_, "milvus_rpc_requests_total",
"the number of rpc requests");
prometheus::Counter& rpc_requests_total_counter_ = rpc_requests_total_.Add({});
prometheus::Family<prometheus::Histogram>& operation_lantency_second_family_ =
prometheus::BuildHistogram()
.Name("milvus_operation_lantency_seconds")
.Help("operation_lantency_seconds")
.Register(prometheus.registry());
prometheus::Histogram& operation_insert_histogram_ = operation_lantency_second_family_.Add(
PROMETHEUS_HISTOGRAM(operation_latency_second_family_, "milvus_operation_latency_seconds",
"operation_latency_seconds");
prometheus::Histogram& operation_insert_histogram_ = operation_latency_second_family_.Add(
{{"operation", "insert"}}, prometheus::Histogram::BucketBoundaries{0.001, 0.01, 0.1, 1});
prometheus::Histogram& operation_search_histogram_ = operation_lantency_second_family_.Add(
prometheus::Histogram& operation_create_index_histogram_ = operation_latency_second_family_.Add(
{{"operation", "create_index"}}, prometheus::Histogram::BucketBoundaries{1, 10, 100, 1000});
prometheus::Histogram& operation_search_histogram_ = operation_latency_second_family_.Add(
{{"operation", "search"}}, prometheus::Histogram::BucketBoundaries{0.001, 0.01, 0.1, 1});
};

View File

@ -805,9 +805,6 @@ class WebController : public oatpp::web::server::api::ApiController {
ADD_DEFAULT_CORS(Metrics)
ENDPOINT("GET", "/metrics", Metrics) {
TimeRecorder tr(std::string(WEB_LOG_PREFIX) + R"(GET /metrics/)");
tr.RecordSection("Received request.");
WebRequestHandler handler = WebRequestHandler();
OString result = "";
auto status_dto = handler.GetMetrics(result);
@ -820,9 +817,6 @@ class WebController : public oatpp::web::server::api::ApiController {
response = createDtoResponse(Status::CODE_400, status_dto);
}
tr.ElapseFromBegin("Done. Status: code = " + std::to_string(*(status_dto->code)) +
", reason = " + status_dto->message->std_str() + ". Total cost");
return response;
}

View File

@ -659,6 +659,7 @@ WebRequestHandler::ProcessLeafQueryJson(const nlohmann::json& json, milvus::quer
auto& values = vector_param_it.value()["query"];
vector_query->query_vector.vector_count = values.size();
vector_query->nq = values.size();
for (auto& vector_records : values) {
if (field_type_.find(vector_name) != field_type_.end()) {
if (field_type_.at(vector_name) == engine::DataType::VECTOR_FLOAT) {

3
docker/config Normal file
View File

@ -0,0 +1,3 @@
GF_SECURITY_ADMIN_USER=admin
GF_SECURITY_ADMIN_PASSWORD=changeme
GF_USERS_ALLOW_SIGN_UP=false

View File

@ -4,13 +4,19 @@ networks:
monitor:
driver: bridge
volumes:
prometheus_data: {}
grafana_data: {}
services:
prometheus:
image: prom/prometheus:v2.11.1
image: prom/prometheus:v2.17.1
container_name: prometheus
hostname: prometheus
restart: always
volumes:
- ./prometheus:/etc/prometheus
- prometheus_data:/prometheus
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./server_down.yml:/etc/prometheus/node_down.yml
ports:
@ -30,29 +36,41 @@ services:
networks:
- monitor
nodeexporter:
image: prom/node-exporter:v0.18.1
container_name: nodeexporter
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
command:
- '--path.procfs=/host/proc'
- '--path.rootfs=/rootfs'
- '--path.sysfs=/host/sys'
- '--collector.filesystem.ignored-mount-points=^/(sys|proc|dev|host|etc)($$|/)'
restart: unless-stopped
expose:
- 9100
networks:
- monitor
labels:
org.label-schema.group: "monitoring"
grafana:
image: grafana/grafana
image: grafana/grafana:6.7.2
container_name: grafana
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
environment:
- GF_SECURITY_ADMIN_USER=${ADMIN_USER}
- GF_SECURITY_ADMIN_PASSWORD=${ADMIN_PASSWORD}
- GF_USERS_ALLOW_SIGN_UP=false
restart: unless-stopped
expose:
- 3000
hostname: grafana
restart: always
ports:
- "3000:3000"
networks:
- monitor
milvus_server:
runtime: nvidia
image: milvusdb/milvus:latest
restart: always
links:
- prometheus
environment:
WEB_APP: host.docker.internal
volumes:
- ../core/conf/milvus.yaml:/var/lib/milvus/conf/milvus.yaml
- ../core/conf/log_config.conf:/var/lib/milvus/conf/log_config.conf
ports:
- "8080:8080"
- "19530:19530"
networks:
- monitor

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
basicAuth: false
isDefault: true
editable: true

View File

@ -1,7 +1,7 @@
# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 1 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
scrape_interval: 2s # Set the scrape interval to every 1 seconds. Default is every 1 minute.
evaluation_interval: 2s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Alertmanager configuration
@ -19,20 +19,21 @@ rule_files:
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['prometheus:9090']
- targets: ['localhost:9090']
# scrape metrics of server
- job_name: 'nodeexporter'
scrape_interval: 5s
static_configs:
- targets: ['nodeexporter:9100']
# Allows ephemeral and batch jobs to expose their metrics to Prometheus
- job_name: 'milvus_server'
scrape_interval: 1s
honor_labels: true
static_configs:
- targets: ['milvus_server:8080']
- targets: ['172.16.50.14:19121']
# under development
- job_name: 'pushgateway'
honor_labels: true
static_configs:
- targets: ['pushgateway:9091']
- targets: ['localhost:9091']

View File

View File