From d054c0aa7b8956811b92d6547a4365692901265c Mon Sep 17 00:00:00 2001 From: shengjh <1572099106@qq.com> Date: Sun, 27 Sep 2020 20:56:23 +0800 Subject: [PATCH] Add benchmark for proxy and writer Signed-off-by: shengjh <1572099106@qq.com> --- proxy/src/message_client/ClientV2.cpp | 18 ++++-- .../src/server/delivery/request/InsertReq.cpp | 64 ++++++++++++++++++- writer/main.go | 27 ++++++-- 3 files changed, 100 insertions(+), 9 deletions(-) diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index b17cde0c06..f3317177c3 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include "log/Log.h" namespace milvus::message_client { @@ -210,9 +211,16 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, auto end = stdclock::now(); auto data_size = request.ByteSize(); - LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " - << "throughput: " << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 / 1024 - << "M/s"; + char buff[128]; + auto r = getcwd(buff, 128); + auto path = std::string(buff); + std::ofstream file(path + "/proxy2pulsar.benchmark", std::fstream::app); + file << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " + << "cost" << std::chrono::duration_cast(end - start).count() / 1000.0 << "s, " + << "throughput: " + << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 + / 1024 + << "M/s" << std::endl; for (auto &stat : stats) { if (!stat.ok()) { @@ -259,7 +267,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, auto end = stdclock::now(); auto data_size = request.ByteSize(); LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " - << "throughput: " << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 / 1024 + << "throughput: " + << data_size / std::chrono::duration_cast(end - start).count() * 1000 + / 1024.0 / 1024 << "M/s"; for (auto &stat : stats) { diff --git a/proxy/src/server/delivery/request/InsertReq.cpp b/proxy/src/server/delivery/request/InsertReq.cpp index 3aaebb86d2..fce0ffd5e2 100644 --- a/proxy/src/server/delivery/request/InsertReq.cpp +++ b/proxy/src/server/delivery/request/InsertReq.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #ifdef ENABLE_CPU_PROFILING #include @@ -43,15 +44,76 @@ InsertReq::Create(const ContextPtr &context, const ::milvus::grpc::InsertParam * Status InsertReq::OnExecute() { +#ifndef BENCHMARK +#define BENCHMARK +#endif + +#ifdef BENCHMARK + const uint64_t count_msg_num = 50000 * 10; + const double MB = 1024 * 1024; + using stdclock = std::chrono::high_resolution_clock; + static uint64_t inserted_count, inserted_size = 0; + static stdclock::time_point start, end; + const int interval = 2; + const int per_log_records = 10000 * 100; + static uint64_t ready_log_records = 0; + static int log_flag = 0; + static bool shouldBenchmark = false; + static std::stringstream log; + char buff[128]; + auto r = getcwd(buff, 128); + auto path = std::string(buff); + std::ofstream file(path + "/proxy.benchmark", std::fstream::app); +#endif + LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq."; auto &msg_client = MessageWrapper::GetInstance().MessageClient(); auto segment_id = [](const std::string &collection_name, uint64_t channel_id, uint64_t timestamp) { - return MetaWrapper::GetInstance().AskSegmentId(collection_name, channel_id, timestamp); + return MetaWrapper::GetInstance().AskSegmentId(collection_name, channel_id, timestamp); }; + +#ifdef BENCHMARK + if (inserted_count >= count_msg_num && !shouldBenchmark) { + shouldBenchmark = true; + start = stdclock::now(); + inserted_count = 0; + inserted_size = 0; + } +#endif + Status status; status = msg_client->SendMutMessage(*insert_param_, timestamp_, segment_id); + +#ifdef BENCHMARK + inserted_count += insert_param_->rows_data_size(); + inserted_size += insert_param_->ByteSize(); + if (shouldBenchmark && inserted_count > 1000) { + end = stdclock::now(); + ready_log_records += inserted_count; + auto duration = std::chrono::duration_cast(end - start).count() / 1000.0; + if (duration > interval) { + log << "===================>Insert: " + << inserted_count << "records," + << " size: " << inserted_size / MB << "MB" + << " cost: " << duration << "s," + << " throughput: " + << double(inserted_size) / duration / MB + << "M/s\n"; + auto new_flag = ready_log_records / per_log_records; + if (new_flag != log_flag) { + log_flag = new_flag; + file << log.str(); + file.flush(); + log.str(""); + } + inserted_size = 0; + inserted_count = 0; + start = stdclock::now(); + } + } +#endif return status; } diff --git a/writer/main.go b/writer/main.go index 878f127d42..7a85ad5af5 100644 --- a/writer/main.go +++ b/writer/main.go @@ -8,6 +8,7 @@ import ( "github.com/czs007/suvlim/writer/message_client" "github.com/czs007/suvlim/writer/write_node" "log" + "os" "strconv" "sync" "time" @@ -46,12 +47,20 @@ func main() { const Debug = true const MB = 1024 * 1024 - const timeInterval = time.Second * 5 + const timeInterval = time.Second * 2 const CountMsgNum = 10000 * 10 if Debug { var shouldBenchmark = false var start time.Time + var LogRecords int64 + var logFlag int64 + var logString = "" + logFile, err := os.OpenFile("writenode.benchmark", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777) + defer logFile.Close() + if err != nil { + log.Fatalf("Prepare log file error, " + err.Error()) + } for { if ctx.Err() != nil { @@ -59,8 +68,10 @@ func main() { } msgLength := wn.MessageClient.PrepareBatchMsg() // wait until first 100,000 rows are successfully wrote - if wn.MsgCounter.InsertCounter >= CountMsgNum { + if wn.MsgCounter.InsertCounter >= CountMsgNum && shouldBenchmark == false { shouldBenchmark = true + wn.MsgCounter.InsertCounter = 0 + wn.MsgCounter.InsertedRecordSize = 0 start = time.Now() } if msgLength > 0 { @@ -69,12 +80,20 @@ func main() { } // Test insert time // ignore if less than 1000 records per time interval - if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000{ + if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 { + LogRecords += msgCounter.InsertCounter timeSince := time.Since(start) if timeSince >= timeInterval { speed := wn.MsgCounter.InsertedRecordSize / timeInterval.Seconds() / MB - fmt.Println("============> Insert", wn.MsgCounter.InsertCounter,"records, cost:" , timeSince, "speed:", speed, "M/s", "<============") + logString = fmt.Sprintln("============> Insert", wn.MsgCounter.InsertCounter, "records, cost:", timeSince, "speed:", speed, "M/s", "<============") + newFlag := LogRecords / (10000 * 100) + if newFlag != logFlag { + logFlag = newFlag + fmt.Fprintln(logFile, logString) + logString = "" + } wn.MsgCounter.InsertCounter = 0 + wn.MsgCounter.InsertedRecordSize = 0 start = time.Now() } }