Add reader writeLog, fix insert buffer out of range

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-09-27 18:26:33 +08:00 committed by yefu.chen
parent a87b1a42bc
commit 0d2780a2eb
8 changed files with 205 additions and 63 deletions

View File

@ -17,6 +17,9 @@ import (
"encoding/json"
"fmt"
"github.com/czs007/suvlim/conf"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/reader/message_client"
"github.com/stretchr/testify/assert"
"log"
"sort"
@ -24,9 +27,6 @@ import (
"sync/atomic"
"time"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/reader/message_client"
//"github.com/stretchr/testify/assert"
)
@ -69,8 +69,21 @@ type QueryInfo struct {
type MsgCounter struct {
InsertCounter int64
InsertTime time.Time
DeleteCounter int64
DeleteTime time.Time
SearchCounter int64
SearchTime time.Time
}
type InsertLog struct {
MsgLength int
DurationInMilliseconds int64
InsertTime time.Time
NumSince int64
Speed float64
}
type QueryNode struct {
@ -86,6 +99,7 @@ type QueryNode struct {
insertData InsertData
kvBase *kv.EtcdKVBase
msgCounter *MsgCounter
InsertLogs []InsertLog
}
func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
@ -95,7 +109,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
ReadTimeSyncMin: timeSync,
ReadTimeSyncMax: timeSync,
WriteTimeSync: timeSync,
ServiceTimeSync: timeSync,
ServiceTimeSync: timeSync,
TSOTimeSync: timeSync,
}
@ -135,7 +149,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes
ReadTimeSyncMin: timeSync,
ReadTimeSyncMax: timeSync,
WriteTimeSync: timeSync,
ServiceTimeSync: timeSync,
ServiceTimeSync: timeSync,
TSOTimeSync: timeSync,
}
@ -162,6 +176,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
msgCounter: &msgCounter,
InsertLogs: make([]InsertLog, 0),
}
}
@ -246,13 +261,11 @@ func (node *QueryNode) InitQueryNodeCollection() {
func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
const Debug = true
const CountMsgNum = 1000 * 1000
const CountInsertMsgBaseline = 1000 * 1000
var BaselineCounter int64 = 0
node.msgCounter.InsertTime = time.Now()
if Debug {
var printFlag = true
var startTime = true
var start time.Time
for {
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
@ -264,10 +277,9 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
continue
}
if startTime {
fmt.Println("============> Start Test <============")
startTime = false
start = time.Now()
if node.msgCounter.InsertCounter/CountInsertMsgBaseline == BaselineCounter {
node.WriteQueryLog()
BaselineCounter++
}
node.QueryNodeDataInit()
@ -279,13 +291,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
node.DoInsertAndDelete()
//fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
// Test insert time
if printFlag && node.msgCounter.InsertCounter >= CountMsgNum {
printFlag = false
timeSince := time.Since(start)
fmt.Println("============> Do", node.msgCounter.InsertCounter, "Insert in", timeSince, "<============")
}
}
}
@ -334,14 +339,14 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
fmt.Println("Do Search...")
//for {
//if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync {
var status = node.Search(node.messageClient.SearchMsg)
if status.ErrorCode != 0 {
fmt.Println("Search Failed")
node.PublishFailedSearchResult()
}
//break
//}
//if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync {
var status = node.Search(node.messageClient.SearchMsg)
if status.ErrorCode != 0 {
fmt.Println("Search Failed")
node.PublishFailedSearchResult()
}
//break
//}
//}
default:
}
@ -485,9 +490,9 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status {
func (node *QueryNode) DoInsertAndDelete() msgPb.Status {
var wg sync.WaitGroup
// Do insert
for segmentID, records := range node.insertData.insertRecords {
for segmentID := range node.insertData.insertRecords {
wg.Add(1)
go node.DoInsert(segmentID, &records, &wg)
go node.DoInsert(segmentID, &wg)
}
// Do delete
@ -505,7 +510,7 @@ func (node *QueryNode) DoInsertAndDelete() msgPb.Status {
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.WaitGroup) msgPb.Status {
func (node *QueryNode) DoInsert(segmentID int64, wg *sync.WaitGroup) msgPb.Status {
fmt.Println("Doing insert..., len = ", len(node.insertData.insertIDs[segmentID]))
var targetSegment, err = node.GetSegmentBySegmentID(segmentID)
if err != nil {
@ -515,10 +520,12 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai
ids := node.insertData.insertIDs[segmentID]
timestamps := node.insertData.insertTimestamps[segmentID]
records := node.insertData.insertRecords[segmentID]
offsets := node.insertData.insertOffset[segmentID]
node.msgCounter.InsertCounter += int64(len(ids))
err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, records)
node.QueryLog(len(ids))
err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, &records)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
@ -585,7 +592,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
// Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds.
// Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`.
var logicTimestamp = searchTimestamp << 46 >> 46
searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval + 600)) << 18 + logicTimestamp
searchTimestamp = (searchTimestamp>>18-uint64(conf.Config.Timesync.Interval+600))<<18 + logicTimestamp
var vector = msg.Records
// We now only the first Json is valid.
@ -594,7 +601,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
// 1. Timestamp check
// TODO: return or wait? Or adding graceful time
if searchTimestamp > node.queryNodeTimeSync.ServiceTimeSync {
fmt.Println("Invalid query time, timestamp = ", searchTimestamp >> 18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync >> 18)
fmt.Println("Invalid query time, timestamp = ", searchTimestamp>>18, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync>>18)
return msgPb.Status{ErrorCode: 1}
}

View File

@ -16,6 +16,7 @@ import (
"fmt"
"github.com/czs007/suvlim/errors"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
"strconv"
"unsafe"
)
@ -143,11 +144,13 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]
var numOfRow = len(*entityIDs)
var sizeofPerRow = len((*records)[0])
var rawData = make([]byte, numOfRow*sizeofPerRow)
assert.Equal(nil, numOfRow, len(*records))
var rawData = make([]byte, numOfRow * sizeofPerRow)
var copyOffset = 0
for i := 0; i < len(*records); i++ {
copy(rawData[copyOffset:], (*records)[i])
copyOffset += len((*records)[i])
copyOffset += sizeofPerRow
}
var cOffset = C.long(offset)

View File

@ -1,8 +1,13 @@
package reader
import (
"encoding/json"
"errors"
"fmt"
log "github.com/apache/pulsar/pulsar-client-go/logutil"
"os"
"strconv"
"time"
)
// Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs
@ -68,3 +73,54 @@ func (c *Collection) GetPartitionByName(partitionName string) (partition *Partit
return nil
// TODO: remove from c.Partitions
}
func (node *QueryNode) QueryLog(length int) {
node.msgCounter.InsertCounter += int64(length)
timeNow := time.Now()
duration := timeNow.Sub(node.msgCounter.InsertTime)
speed := float64(length) / duration.Seconds()
insertLog := InsertLog{
MsgLength: length,
DurationInMilliseconds: duration.Milliseconds(),
InsertTime: timeNow,
NumSince: node.msgCounter.InsertCounter,
Speed: speed,
}
node.InsertLogs = append(node.InsertLogs, insertLog)
node.msgCounter.InsertTime = timeNow
}
func (node *QueryNode) WriteQueryLog() {
f, err := os.OpenFile("/tmp/query_node.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
// write logs
for _, insertLog := range node.InsertLogs {
insertLogJson, err := json.Marshal(&insertLog)
if err != nil {
log.Fatal(err)
}
writeString := string(insertLogJson) + "\n"
fmt.Println(writeString)
_, err2 := f.WriteString(writeString)
if err2 != nil {
log.Fatal(err2)
}
}
// reset InsertLogs buffer
node.InsertLogs = make([]InsertLog, 0)
err = f.Close()
if err != nil {
log.Fatal(err)
}
fmt.Println("write log done")
}

View File

@ -10,17 +10,24 @@ const int DIM = 128;
int main(int argc , char**argv) {
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
if (!parameters.is_valid){
return 0;
}
if (parameters.collection_name_.empty()){
std::cout<< "should specify collection name!" << std::endl;
milvus_sdk::Utils::PrintHelp(argc, argv);
return 0;
}
auto client = milvus::ConnectionImpl();
milvus::ConnectParam connect_param;
connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_;
connect_param.port = parameters.port_.empty() ? "19530":parameters.port_ ;
client.Connect(connect_param);
milvus::Status stat;
const std::string collectin_name = "collectionTest";
const std::string collection_name = parameters.collection_name_;
// Create
milvus::FieldPtr field_ptr1 = std::make_shared<milvus::Field>();
@ -34,15 +41,24 @@ int main(int argc , char**argv) {
field_ptr2->field_type = milvus::DataType::VECTOR_FLOAT;
field_ptr2->dim = DIM;
milvus::Mapping mapping = {collectin_name, {field_ptr1, field_ptr2}};
milvus::Mapping mapping = {collection_name, {field_ptr1, field_ptr2}};
milvus::Status stat;
stat = client.CreateCollection(mapping, "extra_params");
if (!stat.ok()){
std::cout << "create collection failed!" << std::endl;
return 0;
}
std::cout << "create collection done!" << std::endl;
// Get Collection info
milvus::Mapping map;
client.GetCollectionInfo(collectin_name, map);
client.GetCollectionInfo(collection_name, map);
for (auto &f : map.fields) {
std::cout << f->field_name << ":" << int(f->field_type) << ":" << f->dim << "DIM" << std::endl;
}
return 0;
}

View File

@ -23,20 +23,31 @@
const int N = 200000;
const int DIM = 128;
const int LOOP = 100;
int ID_START = 0;
const milvus::FieldValue GetData() {
int generate_ids(std::vector<int64_t> & ids_array, int count);
int generate_ids(std::vector<int64_t>& ids_array, int count) {
for (int i = 0; i < count; i++) {
ids_array.push_back(ID_START++);
}
return 0;
}
const milvus::FieldValue GetData(int count) {
milvus::FieldValue value_map;
std::vector<int32_t> int32_data;
for (int i = 0; i < N; i++) {
int32_data.push_back(i);
for (int i = 0; i < count; i++) {
int32_data.push_back(ID_START++);
}
std::default_random_engine eng(42);
std::normal_distribution<float> dis(0, 1);
std::vector<milvus::VectorData> vector_data;
for (int i = 0; i < N; i++) {
for (int i = 0; i < count; i++) {
std::vector<float> float_data(DIM);
for(auto &x: float_data) {
x = dis(eng);
@ -52,6 +63,33 @@ const milvus::FieldValue GetData() {
return value_map;
}
bool checkSchema(){
// Get Collection info
bool ret = false;
milvus::FieldPtr field_ptr1 = std::make_shared<milvus::Field>();
milvus::FieldPtr field_ptr2 = std::make_shared<milvus::Field>();
field_ptr1->field_name = "age";
field_ptr1->field_type = milvus::DataType::INT32;
field_ptr1->dim = 1;
field_ptr2->field_name = "field_vec";
field_ptr2->field_type = milvus::DataType::VECTOR_FLOAT;
field_ptr2->dim = DIM;
std::vector<milvus::FieldPtr> fields{field_ptr1, field_ptr2};
milvus::Mapping map;
//client.GetCollectionInfo(collection_name, map);
for (auto &f : map.fields) {
///std::cout << f->field_name << ":" << int(f->field_type) << ":" << f->dim << "DIM" << std::endl;
}
return true;
}
int
main(int argc, char* argv[]) {
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
@ -59,21 +97,38 @@ main(int argc, char* argv[]) {
return 0;
}
if (parameters.collection_name_.empty()){
std::cout<< "should specify collection name!" << std::endl;
milvus_sdk::Utils::PrintHelp(argc, argv);
return 0;
}
const std::string collection_name = parameters.collection_name_;
auto client = milvus::ConnectionImpl();
milvus::ConnectParam connect_param;
connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_;
connect_param.port = parameters.port_.empty() ? "19530":parameters.port_ ;
client.Connect(connect_param);
std::vector<int64_t> ids_array;
auto data = GetData();
for (int64_t i = 0; i < N; i++) {
ids_array.push_back(i);
}
milvus_sdk::TimeRecorder insert("insert");
auto status = client.Insert("collection0", "tag01", data, ids_array);
if (!status.ok()){
return -1;
int per_count = N / LOOP;
int failed_count = 0;
milvus_sdk::TimeRecorder insert_timer("insert");
for (int64_t i = 0; i < LOOP; i++) {
std::vector<int64_t> ids_array;
generate_ids(ids_array, per_count);
auto data = GetData(per_count);
insert_timer.Start();
auto status = client.Insert(collection_name, "default", data, ids_array);
if (!status.ok()){
failed_count += 1;
}
insert_timer.End();
}
if (failed_count > 0) {
std::cout <<" test done, failed_count is :" << failed_count<< std::endl;
}
insert_timer.Print(LOOP);
return 0;
}

View File

@ -21,6 +21,7 @@
const int TOP_K = 10;
const int LOOP = 1000;
const int DIM = 128;
int main(int argc , char**argv) {
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
@ -28,6 +29,13 @@ int main(int argc , char**argv) {
return 0;
}
if (parameters.collection_name_.empty()){
std::cout<< "should specify collection name!" << std::endl;
milvus_sdk::Utils::PrintHelp(argc, argv);
return 0;
}
auto client = milvus::ConnectionImpl();
milvus::ConnectParam connect_param;
connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_;
@ -36,8 +44,6 @@ int main(int argc , char**argv) {
std::vector<int64_t> ids_array;
std::vector<std::string> partition_list;
partition_list.emplace_back("default");
// partition_list.emplace_back("partition-2");
// partition_list.emplace_back("partition-3");
milvus::VectorParam vectorParam;
std::vector<milvus::VectorData> vector_records;

View File

@ -472,10 +472,9 @@ Utils::PrintTopKQueryResult(milvus::TopKQueryResult& topk_query_result) {
void
Utils::HAHE(int argc){
std::cout<<"FUCK"<<std::endl;
Utils::PrintHelp(int argc, char* argv[]) {
std::string app_name = basename(argv[0]);
print_help(app_name);
}

View File

@ -94,8 +94,8 @@ class Utils {
static TestParameters
ParseTestParameters(int argc, char* argv[]);
static void
HAHE(int argc);
static
void PrintHelp(int argc, char* argv[]);
};