mirror of https://github.com/milvus-io/milvus.git
Update throughput log for proxy
Signed-off-by: shengjh <1572099106@qq.com>pull/4973/head^2
parent
56fa20c5ab
commit
6797fd0b0b
|
@ -168,7 +168,11 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
|
|||
const std::function<uint64_t(const std::string &collection_name,
|
||||
uint64_t channel_id,
|
||||
uint64_t timestamp)> &segment_id) {
|
||||
const uint64_t num_records_log = 100 * 10000;
|
||||
static uint64_t num_inserted = 0;
|
||||
static uint64_t size_inserted = 0;
|
||||
using stdclock = std::chrono::high_resolution_clock;
|
||||
static stdclock::duration time_cost;
|
||||
auto start = stdclock::now();
|
||||
// may have retry policy?
|
||||
auto row_count = request.rows_data_size();
|
||||
|
@ -210,17 +214,26 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
|
|||
}
|
||||
|
||||
auto end = stdclock::now();
|
||||
auto data_size = request.ByteSize();
|
||||
char buff[128];
|
||||
auto r = getcwd(buff, 128);
|
||||
auto path = std::string(buff);
|
||||
std::ofstream file(path + "/proxy2pulsar.benchmark", std::fstream::app);
|
||||
file << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, "
|
||||
<< "cost" << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.0 << "s, "
|
||||
<< "throughput: "
|
||||
<< data_size / std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() * 1000 / 1024.0
|
||||
/ 1024
|
||||
<< "M/s" << std::endl;
|
||||
time_cost += (end - start);
|
||||
num_inserted += row_count;
|
||||
size_inserted += request.ByteSize();
|
||||
if (num_inserted >= num_records_log) {
|
||||
// char buff[128];
|
||||
// auto r = getcwd(buff, 128);
|
||||
auto path = std::string("/tmp");
|
||||
std::ofstream file(path + "/proxy2pulsar.benchmark", std::fstream::app);
|
||||
file << "[" << milvus::CommonUtil::TimeToString(start) << "]"
|
||||
<< " Insert " << num_inserted << " records, "
|
||||
<< "size:" << size_inserted / 1024.0 / 1024.0 << "M, "
|
||||
<< "cost" << std::chrono::duration_cast<std::chrono::milliseconds>(time_cost).count() / 1000.0 << "s, "
|
||||
<< "throughput: "
|
||||
<< double(size_inserted) / std::chrono::duration_cast<std::chrono::milliseconds>(time_cost).count() * 1000 / 1024.0
|
||||
/ 1024
|
||||
<< "M/s" << std::endl;
|
||||
time_cost = stdclock::duration(0);
|
||||
num_inserted = 0;
|
||||
size_inserted = 0;
|
||||
}
|
||||
|
||||
for (auto &stat : stats) {
|
||||
if (!stat.ok()) {
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
#include <unistd.h>
|
||||
#include "utils/CommonUtil.h"
|
||||
|
||||
#ifdef ENABLE_CPU_PROFILING
|
||||
#include <gperftools/profiler.h>
|
||||
|
@ -60,9 +61,9 @@ InsertReq::OnExecute() {
|
|||
static int log_flag = 0;
|
||||
static bool shouldBenchmark = false;
|
||||
static std::stringstream log;
|
||||
char buff[128];
|
||||
auto r = getcwd(buff, 128);
|
||||
auto path = std::string(buff);
|
||||
// char buff[128];
|
||||
// auto r = getcwd(buff, 128);
|
||||
auto path = std::string("/tmp");
|
||||
std::ofstream file(path + "/proxy.benchmark", std::fstream::app);
|
||||
#endif
|
||||
|
||||
|
@ -89,16 +90,17 @@ InsertReq::OnExecute() {
|
|||
#ifdef BENCHMARK
|
||||
inserted_count += insert_param_->rows_data_size();
|
||||
inserted_size += insert_param_->ByteSize();
|
||||
if (shouldBenchmark && inserted_count > 1000) {
|
||||
if (shouldBenchmark) {
|
||||
end = stdclock::now();
|
||||
ready_log_records += inserted_count;
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.0;
|
||||
if (duration > interval) {
|
||||
log << "===================>Insert: "
|
||||
<< inserted_count << "records,"
|
||||
<< " size: " << inserted_size / MB << "MB"
|
||||
<< " cost: " << duration << "s,"
|
||||
<< " throughput: "
|
||||
log << "[" << milvus::CommonUtil::TimeToString(start) << "] "
|
||||
<< "Insert "
|
||||
<< inserted_count << " records, "
|
||||
<< "size: " << inserted_size / MB << "MB, "
|
||||
<< "cost: " << duration << "s, "
|
||||
<< "throughput: "
|
||||
<< double(inserted_size) / duration / MB
|
||||
<< "M/s\n";
|
||||
auto new_flag = ready_log_records / per_log_records;
|
||||
|
|
|
@ -175,6 +175,14 @@ CommonUtil::TimeStrToTime(const std::string& time_str, time_t& time_integer, tm&
|
|||
return true;
|
||||
}
|
||||
|
||||
std::string CommonUtil::TimeToString(std::chrono::high_resolution_clock::time_point t) {
|
||||
std::time_t tt = std::chrono::system_clock::to_time_t(t);
|
||||
|
||||
char buf[100] = {0};
|
||||
std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&tt));
|
||||
return std::string(buf);
|
||||
}
|
||||
|
||||
void
|
||||
CommonUtil::ConvertTime(time_t time_integer, tm& time_struct) {
|
||||
localtime_r(&time_integer, &time_struct);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include <time.h>
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
|
||||
namespace milvus {
|
||||
|
||||
|
@ -40,6 +41,9 @@ class CommonUtil {
|
|||
TimeStrToTime(const std::string& time_str, time_t& time_integer, tm& time_struct,
|
||||
const std::string& format = "%d-%d-%d %d:%d:%d");
|
||||
|
||||
static std::string
|
||||
TimeToString(std::chrono::high_resolution_clock::time_point t);
|
||||
|
||||
static void
|
||||
ConvertTime(time_t time_integer, tm& time_struct);
|
||||
static void
|
||||
|
|
Loading…
Reference in New Issue