Add throughput log in writer

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-09-28 11:34:51 +08:00 committed by yefu.chen
parent eb1d7d5cc4
commit 56fa20c5ab
8 changed files with 194 additions and 102 deletions

View File

@ -164,8 +164,11 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes
msgCounter := MsgCounter{
InsertCounter: 0,
InsertTime: time.Now(),
DeleteCounter: 0,
DeleteTime: time.Now(),
SearchCounter: 0,
SearchTime: time.Now(),
}
return &QueryNode{
@ -263,7 +266,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
const Debug = true
const CountInsertMsgBaseline = 1000 * 1000
var BaselineCounter int64 = 0
node.msgCounter.InsertTime = time.Now()
if Debug {
for {

View File

@ -93,7 +93,7 @@ func (node *QueryNode) QueryLog(length int) {
}
func (node *QueryNode) WriteQueryLog() {
f, err := os.OpenFile("/tmp/query_node.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
f, err := os.OpenFile("/tmp/query_node_insert.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}

View File

@ -25,16 +25,12 @@ struct TestParameters {
std::string port_;
std::string collection_name_;
// collection parameters, only works when collection_name_ is empty
int64_t index_type_ = (int64_t)milvus::IndexType::IVFSQ8; // sq8
int64_t nlist_ = 16384;
int64_t metric_type_ = (int64_t)milvus::MetricType::L2; // L2
int64_t id_start_ = -1;
int64_t id_count_ = 0;
int64_t loop_ = 0;
// query parameters
int64_t query_count_ = 1000;
int64_t nq_ = 1;
int64_t topk_ = 10;
int64_t nprobe_ = 16;
bool is_valid = true;
};

View File

@ -21,9 +21,10 @@
#include "utils/TimeRecorder.h"
#include <random>
const int N = 6000000;
const int DIM = 128;
const int LOOP = 2000;
int N = 6000000;
int DIM = 128;
int LOOP = 2000;
int ID_START = 0;
std::default_random_engine eng(42);
@ -115,6 +116,7 @@ bool check_schema(const milvus::Mapping & map){
return true;
}
int
main(int argc, char* argv[]) {
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
@ -128,6 +130,33 @@ main(int argc, char* argv[]) {
return 0;
}
if (parameters.id_start_ < 0){
std::cout<< "id_start should >= 0 !" << std::endl;
milvus_sdk::Utils::PrintHelp(argc, argv);
return 0;
}
if (parameters.id_count_ <= 0){
std::cout<< "id_count should > 0 !" << std::endl;
milvus_sdk::Utils::PrintHelp(argc, argv);
return 0;
}
if (parameters.loop_ <= 0){
std::cout<< "loop should > 0 !" << std::endl;
milvus_sdk::Utils::PrintHelp(argc, argv);
return 0;
}
N = parameters.id_count_;
ID_START = parameters.id_start_;
LOOP = parameters.loop_;
std::cout<<"N: " << N << std::endl;
std::cout<<"ID_START: " << ID_START << std::endl;
std::cout<<"LOOP: " << LOOP << std::endl;
const std::string collection_name = parameters.collection_name_;
auto client = milvus::ConnectionImpl();
@ -149,18 +178,26 @@ main(int argc, char* argv[]) {
int per_count = N / LOOP;
int failed_count = 0;
std::cout<<"PER_COUNT: " << per_count << std::endl;
milvus_sdk::TimeRecorder insert_timer("insert");
for (int64_t i = 0; i < LOOP; i++) {
std::vector<int64_t> ids_array;
generate_ids(ids_array, per_count);
auto data = GetData(per_count);
for (int64_t i = 0, j=0; j < N;) {
i=j;
j += per_count;
if( j > N ) j = N;
std::vector<int64_t> ids_array;
generate_ids(ids_array, j - i);
auto data = GetData(j - i);
insert_timer.Start();
auto status = client.Insert(collection_name, "default", data, ids_array);
if (!status.ok()){
failed_count += 1;
}
insert_timer.End();
}
if (failed_count > 0) {
std::cout <<" test done, failed_count is :" << failed_count<< std::endl;
}

View File

@ -19,7 +19,9 @@
const int TOP_K = 10;
const int LOOP = 1000;
const int DIM = 128;
std::default_random_engine eng(42);
const milvus::VectorParam

View File

@ -38,17 +38,11 @@ print_help(const std::string& app_name) {
printf(" -t --collection_name target collection name, specify this will ignore collection parameters, "
"default empty\n");
printf(" -h --help Print help information\n");
printf(" -i --index "
"Collection index type(1=IDMAP, 2=IVFLAT, 3=IVFSQ8, 5=IVFSQ8H), default:3\n");
printf(" -l --nlist Collection index nlist, default:16384\n");
printf(" -m --metric "
"Collection metric type(1=L2, 2=IP, 3=HAMMING, 4=JACCARD, 5=TANIMOTO, 6=SUBSTRUCTURE, 7=SUPERSTRUCTURE), "
"default:1\n");
printf(" -q --query_count Query total count, default:1000\n");
printf(" -n --nq nq of each query, default:1\n");
printf(" -i --id_start "
"id_start, default:-1\n");
printf(" -c --count id count, default:0\n");
printf(" -l --loop loop, default:0\n");
printf(" -k --topk topk of each query, default:10\n");
printf(" -b --nprobe nprobe of each query, default:16\n");
printf(" -v --print_result Print query result, default:false\n");
printf("\n");
}
@ -482,13 +476,10 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's
{"port", optional_argument, nullptr, 'p'},
{"help", no_argument, nullptr, 'h'},
{"collection_name", no_argument, nullptr, 't'},
{"index", optional_argument, nullptr, 'i'},
{"nlist", optional_argument, nullptr, 'l'},
{"metric", optional_argument, nullptr, 'm'},
{"query_count", optional_argument, nullptr, 'q'},
{"nq", optional_argument, nullptr, 'n'},
{"id_start", optional_argument, nullptr, 'i'},
{"count", optional_argument, nullptr, 'c'},
{"loop", optional_argument, nullptr, 'l'},
{"topk", optional_argument, nullptr, 'k'},
{"nprobe", optional_argument, nullptr, 'b'},
{nullptr, 0, nullptr, 0}};
int option_index = 0;
@ -496,7 +487,7 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's
TestParameters parameters;
int value;
while ((value = getopt_long(argc, argv, "s:p:t:i:f:l:m:d:r:c:q:n:k:b:vh", long_options, &option_index)) != -1) {
while ((value = getopt_long(argc, argv, "s:p:t:i:l:c:k:h", long_options, &option_index)) != -1) {
switch (value) {
case 's': {
char* address_ptr = strdup(optarg);
@ -518,46 +509,29 @@ static struct option long_options[] = {{"server", optional_argument, nullptr, 's
}
case 'i': {
char* ptr = strdup(optarg);
parameters.index_type_ = atol(ptr);
parameters.id_start_ = atol(ptr);
free(ptr);
break;
}
case 'c': {
char* ptr = strdup(optarg);
parameters.id_count_ = atol(ptr);
free(ptr);
break;
}
case 'l': {
char* ptr = strdup(optarg);
parameters.nlist_ = atol(ptr);
free(ptr);
break;
}
case 'm': {
char* ptr = strdup(optarg);
parameters.metric_type_ = atol(ptr);
free(ptr);
break;
}
case 'q': {
char* ptr = strdup(optarg);
parameters.query_count_ = atol(ptr);
free(ptr);
break;
}
case 'n': {
char* ptr = strdup(optarg);
parameters.nq_ = atol(ptr);
parameters.loop_ = atol(ptr);
free(ptr);
break;
}
case 'k': {
char* ptr = strdup(optarg);
parameters.topk_ = atol(ptr);
free(ptr);
break;
}
case 'b': {
char* ptr = strdup(optarg);
parameters.nprobe_ = atol(ptr);
free(ptr);
break;
}
case 'h':
default:
print_help(app_name);

View File

@ -8,7 +8,6 @@ import (
"github.com/czs007/suvlim/writer/message_client"
"github.com/czs007/suvlim/writer/write_node"
"log"
"os"
"strconv"
"time"
)
@ -49,16 +48,19 @@ func main() {
const CountMsgNum = 10000 * 10
if Debug {
var shouldBenchmark = false
var start time.Time
var LogRecords int64
var logFlag int64
var logString = ""
logFile, err := os.OpenFile("writenode.benchmark", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777)
defer logFile.Close()
if err != nil {
log.Fatalf("Prepare log file error, " + err.Error())
}
//var shouldBenchmark = false
//var start time.Time
//var LogRecords int64
//var logFlag int64
//var logString = ""
//logFile, err := os.OpenFile("writenode.benchmark", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777)
//defer logFile.Close()
//if err != nil {
// log.Fatalf("Prepare log file error, " + err.Error())
//}
const CountInsertMsgBaseline = 1000 * 1000
var BaselineCounter int64 = 0
for {
if ctx.Err() != nil {
@ -66,35 +68,42 @@ func main() {
}
msgLength := wn.MessageClient.PrepareBatchMsg()
// wait until first 100,000 rows are successfully wrote
if wn.MsgCounter.InsertCounter >= CountMsgNum && shouldBenchmark == false {
shouldBenchmark = true
wn.MsgCounter.InsertCounter = 0
wn.MsgCounter.InsertedRecordSize = 0
start = time.Now()
}
//if wn.MsgCounter.InsertCounter >= CountMsgNum && shouldBenchmark == false {
// shouldBenchmark = true
// wn.MsgCounter.InsertCounter = 0
// wn.MsgCounter.InsertedRecordSize = 0
// start = time.Now()
//}
if msgLength > 0 {
wn.DoWriteNode(ctx)
fmt.Println("write node do a batch message, storage len: ", msgLength)
}
if wn.MsgCounter.InsertCounter/CountInsertMsgBaseline == BaselineCounter {
wn.WriteWriterLog()
BaselineCounter++
}
// Test insert time
// ignore if less than 1000 records per time interval
if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 {
LogRecords += msgCounter.InsertCounter
timeSince := time.Since(start)
if timeSince >= timeInterval {
speed := wn.MsgCounter.InsertedRecordSize / timeInterval.Seconds() / MB
logString = fmt.Sprintln("============> Insert", wn.MsgCounter.InsertCounter, "records, cost:", timeSince, "speed:", speed, "M/s", "<============")
newFlag := LogRecords / (10000 * 100)
if newFlag != logFlag {
logFlag = newFlag
fmt.Fprintln(logFile, logString)
logString = ""
}
wn.MsgCounter.InsertCounter = 0
wn.MsgCounter.InsertedRecordSize = 0
start = time.Now()
}
}
//if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 {
// LogRecords += msgCounter.InsertCounter
// timeSince := time.Since(start)
// if timeSince >= timeInterval {
// speed := wn.MsgCounter.InsertedRecordSize / timeInterval.Seconds() / MB
// logString = fmt.Sprintln("============> Insert", wn.MsgCounter.InsertCounter, "records, cost:", timeSince, "speed:", speed, "M/s", "<============")
// newFlag := LogRecords / (10000 * 100)
// if newFlag != logFlag {
// logFlag = newFlag
// fmt.Fprintln(logFile, logString)
// logString = ""
// }
// wn.MsgCounter.InsertCounter = 0
// wn.MsgCounter.InsertedRecordSize = 0
// start = time.Now()
// }
//}
}
}

View File

@ -2,14 +2,18 @@ package write_node
import (
"context"
"encoding/json"
"fmt"
"github.com/czs007/suvlim/conf"
msgpb "github.com/czs007/suvlim/pkg/master/grpc/message"
storage "github.com/czs007/suvlim/storage/pkg"
"github.com/czs007/suvlim/storage/pkg/types"
"github.com/czs007/suvlim/writer/message_client"
"log"
"os"
"strconv"
"sync"
"time"
)
type SegmentIdInfo struct {
@ -20,8 +24,19 @@ type SegmentIdInfo struct {
type MsgCounter struct {
InsertCounter int64
InsertedRecordSize float64
InsertTime time.Time
// InsertedRecordSize float64
DeleteCounter int64
DeleteTime time.Time
}
type InsertLog struct {
MsgLength int
DurationInMilliseconds int64
InsertTime time.Time
NumSince int64
Speed float64
}
type WriteNode struct {
@ -29,6 +44,7 @@ type WriteNode struct {
MessageClient *message_client.MessageClient
TimeSync uint64
MsgCounter *MsgCounter
InsertLogs []InsertLog
}
func (wn *WriteNode) Close() {
@ -44,8 +60,10 @@ func NewWriteNode(ctx context.Context,
msgCounter := MsgCounter{
InsertCounter: 0,
InsertTime: time.Now(),
DeleteCounter: 0,
InsertedRecordSize: 0,
DeleteTime: time.Now(),
// InsertedRecordSize: 0,
}
return &WriteNode{
@ -53,6 +71,7 @@ func NewWriteNode(ctx context.Context,
MessageClient: &mc,
TimeSync: timeSync,
MsgCounter: &msgCounter,
InsertLogs: make([]InsertLog, 0),
}, err
}
@ -73,11 +92,13 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOr
timeStamp = append(timeStamp, uint64(data[i].Timestamp))
}
wn.MsgCounter.InsertCounter += int64(len(timeStamp))
if len(timeStamp) > 0 {
// assume each record is same size
wn.MsgCounter.InsertedRecordSize += float64(len(timeStamp) * len(data[0].RowsData.Blob))
}
wn.WriterLog(len(timeStamp))
//wn.MsgCounter.InsertCounter += int64(len(timeStamp))
//if len(timeStamp) > 0 {
// // assume each record is same size
// wn.MsgCounter.InsertedRecordSize += float64(len(timeStamp) * len(data[0].RowsData.Blob))
//}
error := (*wn.KvStore).PutRows(ctx, prefixKeys, binaryData, suffixKeys, timeStamp)
if error != nil {
fmt.Println("Can't insert data!")
@ -134,14 +155,14 @@ func (wn *WriteNode) DoWriteNode(ctx context.Context) {
numInsertData := len(wn.MessageClient.InsertMsg)
numGoRoute := conf.Config.Writer.Parallelism
batchSize := numInsertData / numGoRoute
if numInsertData % numGoRoute != 0 {
if numInsertData%numGoRoute != 0 {
batchSize += 1
}
start := 0
end := 0
wg := sync.WaitGroup{}
for end < numInsertData {
if end + batchSize >= numInsertData {
if end+batchSize >= numInsertData {
end = numInsertData
} else {
end = end + batchSize
@ -154,3 +175,54 @@ func (wn *WriteNode) DoWriteNode(ctx context.Context) {
wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg)
wn.UpdateTimeSync(wn.MessageClient.TimeSync())
}
func (wn *WriteNode) WriterLog(length int) {
wn.MsgCounter.InsertCounter += int64(length)
timeNow := time.Now()
duration := timeNow.Sub(wn.MsgCounter.InsertTime)
speed := float64(length) / duration.Seconds()
insertLog := InsertLog{
MsgLength: length,
DurationInMilliseconds: duration.Milliseconds(),
InsertTime: timeNow,
NumSince: wn.MsgCounter.InsertCounter,
Speed: speed,
}
wn.InsertLogs = append(wn.InsertLogs, insertLog)
wn.MsgCounter.InsertTime = timeNow
}
func (wn *WriteNode) WriteWriterLog() {
f, err := os.OpenFile("/tmp/write_node_insert.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
// write logs
for _, insertLog := range wn.InsertLogs {
insertLogJson, err := json.Marshal(&insertLog)
if err != nil {
log.Fatal(err)
}
writeString := string(insertLogJson) + "\n"
fmt.Println(writeString)
_, err2 := f.WriteString(writeString)
if err2 != nil {
log.Fatal(err2)
}
}
// reset InsertLogs buffer
wn.InsertLogs = make([]InsertLog, 0)
err = f.Close()
if err != nil {
log.Fatal(err)
}
fmt.Println("write log done")
}