mirror of https://github.com/milvus-io/milvus.git
Add benchmark for proxy and writer
Signed-off-by: shengjh <1572099106@qq.com>pull/4973/head^2
parent
5157cfe2ef
commit
d054c0aa7b
|
@ -7,6 +7,7 @@
|
|||
#include <omp.h>
|
||||
#include <numeric>
|
||||
#include <algorithm>
|
||||
#include <unistd.h>
|
||||
#include "log/Log.h"
|
||||
|
||||
namespace milvus::message_client {
|
||||
|
@ -210,9 +211,16 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
|
|||
|
||||
auto end = stdclock::now();
|
||||
auto data_size = request.ByteSize();
|
||||
LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, "
|
||||
<< "throughput: " << data_size / std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() * 1000 / 1024.0 / 1024
|
||||
<< "M/s";
|
||||
char buff[128];
|
||||
auto r = getcwd(buff, 128);
|
||||
auto path = std::string(buff);
|
||||
std::ofstream file(path + "/proxy2pulsar.benchmark", std::fstream::app);
|
||||
file << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, "
|
||||
<< "cost" << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.0 << "s, "
|
||||
<< "throughput: "
|
||||
<< data_size / std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() * 1000 / 1024.0
|
||||
/ 1024
|
||||
<< "M/s" << std::endl;
|
||||
|
||||
for (auto &stat : stats) {
|
||||
if (!stat.ok()) {
|
||||
|
@ -259,7 +267,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
|
|||
auto end = stdclock::now();
|
||||
auto data_size = request.ByteSize();
|
||||
LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, "
|
||||
<< "throughput: " << data_size / std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() * 1000 / 1024.0 / 1024
|
||||
<< "throughput: "
|
||||
<< data_size / std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() * 1000
|
||||
/ 1024.0 / 1024
|
||||
<< "M/s";
|
||||
|
||||
for (auto &stat : stats) {
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <unistd.h>
|
||||
|
||||
#ifdef ENABLE_CPU_PROFILING
|
||||
#include <gperftools/profiler.h>
|
||||
|
@ -43,15 +44,76 @@ InsertReq::Create(const ContextPtr &context, const ::milvus::grpc::InsertParam *
|
|||
|
||||
Status
|
||||
InsertReq::OnExecute() {
|
||||
#ifndef BENCHMARK
|
||||
#define BENCHMARK
|
||||
#endif
|
||||
|
||||
#ifdef BENCHMARK
|
||||
const uint64_t count_msg_num = 50000 * 10;
|
||||
const double MB = 1024 * 1024;
|
||||
using stdclock = std::chrono::high_resolution_clock;
|
||||
static uint64_t inserted_count, inserted_size = 0;
|
||||
static stdclock::time_point start, end;
|
||||
const int interval = 2;
|
||||
const int per_log_records = 10000 * 100;
|
||||
static uint64_t ready_log_records = 0;
|
||||
static int log_flag = 0;
|
||||
static bool shouldBenchmark = false;
|
||||
static std::stringstream log;
|
||||
char buff[128];
|
||||
auto r = getcwd(buff, 128);
|
||||
auto path = std::string(buff);
|
||||
std::ofstream file(path + "/proxy.benchmark", std::fstream::app);
|
||||
#endif
|
||||
|
||||
LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq.";
|
||||
auto &msg_client = MessageWrapper::GetInstance().MessageClient();
|
||||
auto segment_id = [](const std::string &collection_name,
|
||||
uint64_t channel_id,
|
||||
uint64_t timestamp) {
|
||||
return MetaWrapper::GetInstance().AskSegmentId(collection_name, channel_id, timestamp);
|
||||
return MetaWrapper::GetInstance().AskSegmentId(collection_name, channel_id, timestamp);
|
||||
};
|
||||
|
||||
#ifdef BENCHMARK
|
||||
if (inserted_count >= count_msg_num && !shouldBenchmark) {
|
||||
shouldBenchmark = true;
|
||||
start = stdclock::now();
|
||||
inserted_count = 0;
|
||||
inserted_size = 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
Status status;
|
||||
status = msg_client->SendMutMessage(*insert_param_, timestamp_, segment_id);
|
||||
|
||||
#ifdef BENCHMARK
|
||||
inserted_count += insert_param_->rows_data_size();
|
||||
inserted_size += insert_param_->ByteSize();
|
||||
if (shouldBenchmark && inserted_count > 1000) {
|
||||
end = stdclock::now();
|
||||
ready_log_records += inserted_count;
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() / 1000.0;
|
||||
if (duration > interval) {
|
||||
log << "===================>Insert: "
|
||||
<< inserted_count << "records,"
|
||||
<< " size: " << inserted_size / MB << "MB"
|
||||
<< " cost: " << duration << "s,"
|
||||
<< " throughput: "
|
||||
<< double(inserted_size) / duration / MB
|
||||
<< "M/s\n";
|
||||
auto new_flag = ready_log_records / per_log_records;
|
||||
if (new_flag != log_flag) {
|
||||
log_flag = new_flag;
|
||||
file << log.str();
|
||||
file.flush();
|
||||
log.str("");
|
||||
}
|
||||
inserted_size = 0;
|
||||
inserted_count = 0;
|
||||
start = stdclock::now();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/czs007/suvlim/writer/message_client"
|
||||
"github.com/czs007/suvlim/writer/write_node"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -46,12 +47,20 @@ func main() {
|
|||
|
||||
const Debug = true
|
||||
const MB = 1024 * 1024
|
||||
const timeInterval = time.Second * 5
|
||||
const timeInterval = time.Second * 2
|
||||
const CountMsgNum = 10000 * 10
|
||||
|
||||
if Debug {
|
||||
var shouldBenchmark = false
|
||||
var start time.Time
|
||||
var LogRecords int64
|
||||
var logFlag int64
|
||||
var logString = ""
|
||||
logFile, err := os.OpenFile("writenode.benchmark", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777)
|
||||
defer logFile.Close()
|
||||
if err != nil {
|
||||
log.Fatalf("Prepare log file error, " + err.Error())
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
|
@ -59,8 +68,10 @@ func main() {
|
|||
}
|
||||
msgLength := wn.MessageClient.PrepareBatchMsg()
|
||||
// wait until first 100,000 rows are successfully wrote
|
||||
if wn.MsgCounter.InsertCounter >= CountMsgNum {
|
||||
if wn.MsgCounter.InsertCounter >= CountMsgNum && shouldBenchmark == false {
|
||||
shouldBenchmark = true
|
||||
wn.MsgCounter.InsertCounter = 0
|
||||
wn.MsgCounter.InsertedRecordSize = 0
|
||||
start = time.Now()
|
||||
}
|
||||
if msgLength > 0 {
|
||||
|
@ -69,12 +80,20 @@ func main() {
|
|||
}
|
||||
// Test insert time
|
||||
// ignore if less than 1000 records per time interval
|
||||
if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000{
|
||||
if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 {
|
||||
LogRecords += msgCounter.InsertCounter
|
||||
timeSince := time.Since(start)
|
||||
if timeSince >= timeInterval {
|
||||
speed := wn.MsgCounter.InsertedRecordSize / timeInterval.Seconds() / MB
|
||||
fmt.Println("============> Insert", wn.MsgCounter.InsertCounter,"records, cost:" , timeSince, "speed:", speed, "M/s", "<============")
|
||||
logString = fmt.Sprintln("============> Insert", wn.MsgCounter.InsertCounter, "records, cost:", timeSince, "speed:", speed, "M/s", "<============")
|
||||
newFlag := LogRecords / (10000 * 100)
|
||||
if newFlag != logFlag {
|
||||
logFlag = newFlag
|
||||
fmt.Fprintln(logFile, logString)
|
||||
logString = ""
|
||||
}
|
||||
wn.MsgCounter.InsertCounter = 0
|
||||
wn.MsgCounter.InsertedRecordSize = 0
|
||||
start = time.Now()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue