From faca23344df10261ce305549d134969c596a9149 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 17 Sep 2020 17:43:53 +0800 Subject: [PATCH] Use opened segment in statistic info Signed-off-by: bigsheeper --- core/include/segment_c.h | 27 +- core/src/dog_segment/segment_c.cpp | 56 +++- core/src/dog_segment/segment_c.h | 27 +- core/unittest/test_c_api.cpp | 437 +++++++++++++++-------------- reader/query_node.go | 3 + reader/segment.go | 42 ++- reader/segment_service.go | 30 +- 7 files changed, 355 insertions(+), 267 deletions(-) diff --git a/core/include/segment_c.h b/core/include/segment_c.h index a9dbb05fd9..1f29087876 100644 --- a/core/include/segment_c.h +++ b/core/include/segment_c.h @@ -7,6 +7,12 @@ extern "C" { typedef void* CSegmentBase; +typedef struct CQueryInfo { + long int num_queries; + int topK; + const char* field_name; +} CQueryInfo; + CSegmentBase NewSegment(CPartition partition, unsigned long segment_id); @@ -38,14 +44,23 @@ Delete(CSegmentBase c_segment, long int PreDelete(CSegmentBase c_segment, long int size); +//int +//Search(CSegmentBase c_segment, +// const char* query_json, +// unsigned long timestamp, +// float* query_raw_data, +// int num_of_query_raw_data, +// long int* result_ids, +// float* result_distances); + int Search(CSegmentBase c_segment, - const char* query_json, - unsigned long timestamp, - float* query_raw_data, - int num_of_query_raw_data, - long int* result_ids, - float* result_distances); + CQueryInfo c_query_info, + unsigned long timestamp, + float* query_raw_data, + int num_of_query_raw_data, + long int* result_ids, + float* result_distances); ////////////////////////////////////////////////////////////////// diff --git a/core/src/dog_segment/segment_c.cpp b/core/src/dog_segment/segment_c.cpp index 5ee49378ed..0400835124 100644 --- a/core/src/dog_segment/segment_c.cpp +++ b/core/src/dog_segment/segment_c.cpp @@ -89,26 +89,56 @@ PreDelete(CSegmentBase c_segment, long int size) { } +//int +//Search(CSegmentBase c_segment, +// const char* query_json, +// unsigned long timestamp, +// float* query_raw_data, +// int num_of_query_raw_data, +// long int* result_ids, +// float* result_distances) { +// auto segment = (milvus::dog_segment::SegmentBase*)c_segment; +// milvus::dog_segment::QueryResult query_result; +// +// // parse query param json +// auto query_param_json_string = std::string(query_json); +// auto query_param_json = nlohmann::json::parse(query_param_json_string); +// +// // construct QueryPtr +// auto query_ptr = std::make_shared(); +// query_ptr->num_queries = query_param_json["num_queries"]; +// query_ptr->topK = query_param_json["topK"]; +// query_ptr->field_name = query_param_json["field_name"]; +// +// query_ptr->query_raw_data.resize(num_of_query_raw_data); +// memcpy(query_ptr->query_raw_data.data(), query_raw_data, num_of_query_raw_data * sizeof(float)); +// +// auto res = segment->Query(query_ptr, timestamp, query_result); +// +// // result_ids and result_distances have been allocated memory in goLang, +// // so we don't need to malloc here. +// memcpy(result_ids, query_result.result_ids_.data(), query_result.row_num_ * sizeof(long int)); +// memcpy(result_distances, query_result.result_distances_.data(), query_result.row_num_ * sizeof(float)); +// +// return res.code(); +//} + int Search(CSegmentBase c_segment, - const char* query_json, - unsigned long timestamp, - float* query_raw_data, - int num_of_query_raw_data, - long int* result_ids, - float* result_distances) { + CQueryInfo c_query_info, + unsigned long timestamp, + float* query_raw_data, + int num_of_query_raw_data, + long int* result_ids, + float* result_distances) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; milvus::dog_segment::QueryResult query_result; - // parse query param json - auto query_param_json_string = std::string(query_json); - auto query_param_json = nlohmann::json::parse(query_param_json_string); - // construct QueryPtr auto query_ptr = std::make_shared(); - query_ptr->num_queries = query_param_json["num_queries"]; - query_ptr->topK = query_param_json["topK"]; - query_ptr->field_name = query_param_json["field_name"]; + query_ptr->num_queries = c_query_info.num_queries; + query_ptr->topK = c_query_info.topK; + query_ptr->field_name = c_query_info.field_name; query_ptr->query_raw_data.resize(num_of_query_raw_data); memcpy(query_ptr->query_raw_data.data(), query_raw_data, num_of_query_raw_data * sizeof(float)); diff --git a/core/src/dog_segment/segment_c.h b/core/src/dog_segment/segment_c.h index a9dbb05fd9..1f29087876 100644 --- a/core/src/dog_segment/segment_c.h +++ b/core/src/dog_segment/segment_c.h @@ -7,6 +7,12 @@ extern "C" { typedef void* CSegmentBase; +typedef struct CQueryInfo { + long int num_queries; + int topK; + const char* field_name; +} CQueryInfo; + CSegmentBase NewSegment(CPartition partition, unsigned long segment_id); @@ -38,14 +44,23 @@ Delete(CSegmentBase c_segment, long int PreDelete(CSegmentBase c_segment, long int size); +//int +//Search(CSegmentBase c_segment, +// const char* query_json, +// unsigned long timestamp, +// float* query_raw_data, +// int num_of_query_raw_data, +// long int* result_ids, +// float* result_distances); + int Search(CSegmentBase c_segment, - const char* query_json, - unsigned long timestamp, - float* query_raw_data, - int num_of_query_raw_data, - long int* result_ids, - float* result_distances); + CQueryInfo c_query_info, + unsigned long timestamp, + float* query_raw_data, + int num_of_query_raw_data, + long int* result_ids, + float* result_distances); ////////////////////////////////////////////////////////////////// diff --git a/core/unittest/test_c_api.cpp b/core/unittest/test_c_api.cpp index af1df71a8d..6aaf05e494 100644 --- a/core/unittest/test_c_api.cpp +++ b/core/unittest/test_c_api.cpp @@ -8,147 +8,149 @@ TEST(CApiTest, CollectionTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - DeleteCollection(collection); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + DeleteCollection(collection); } TEST(CApiTest, PartitonTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - DeleteCollection(collection); - DeletePartition(partition); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + DeleteCollection(collection); + DeletePartition(partition); } TEST(CApiTest, SegmentTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } TEST(CApiTest, InsertTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - std::vector raw_data; - std::vector timestamps; - std::vector uids; - int N = 10000; - std::default_random_engine e(67); - for (int i = 0; i < N; ++i) { - uids.push_back(100000 + i); - timestamps.push_back(0); - // append vec - float vec[16]; - for (auto &x: vec) { - x = e() % 2000 * 0.001 - 1.0; - } - raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec)); - int age = e() % 100; - raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age)); + std::vector raw_data; + std::vector timestamps; + std::vector uids; + int N = 10000; + std::default_random_engine e(67); + for (int i = 0; i < N; ++i) { + uids.push_back(100000 + i); + timestamps.push_back(0); + // append vec + float vec[16]; + for (auto &x: vec) { + x = e() % 2000 * 0.001 - 1.0; } + raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec)); + int age = e() % 100; + raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age)); + } - auto line_sizeof = (sizeof(int) + sizeof(float) * 16); + auto line_sizeof = (sizeof(int) + sizeof(float) * 16); - auto offset = PreInsert(segment, N); + auto offset = PreInsert(segment, N); - auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); + auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); - assert(res == 0); + assert(res == 0); - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } TEST(CApiTest, DeleteTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - long delete_primary_keys[] = {100000, 100001, 100002}; - unsigned long delete_timestamps[] = {0, 0, 0}; + long delete_primary_keys[] = {100000, 100001, 100002}; + unsigned long delete_timestamps[] = {0, 0, 0}; - auto offset = PreDelete(segment, 3); + auto offset = PreDelete(segment, 3); - auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps); - assert(del_res == 0); + auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps); + assert(del_res == 0); - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } TEST(CApiTest, SearchTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - std::vector raw_data; - std::vector timestamps; - std::vector uids; - int N = 10000; - std::default_random_engine e(67); - for (int i = 0; i < N; ++i) { - uids.push_back(100000 + i); - timestamps.push_back(0); - // append vec - float vec[16]; - for (auto &x: vec) { - x = e() % 2000 * 0.001 - 1.0; - } - raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec)); - int age = e() % 100; - raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age)); + std::vector raw_data; + std::vector timestamps; + std::vector uids; + int N = 10000; + std::default_random_engine e(67); + for (int i = 0; i < N; ++i) { + uids.push_back(100000 + i); + timestamps.push_back(0); + // append vec + float vec[16]; + for (auto &x: vec) { + x = e() % 2000 * 0.001 - 1.0; } + raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec)); + int age = e() % 100; + raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age)); + } - auto line_sizeof = (sizeof(int) + sizeof(float) * 16); + auto line_sizeof = (sizeof(int) + sizeof(float) * 16); - auto offset = PreInsert(segment, N); + auto offset = PreInsert(segment, N); - auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); - assert(ins_res == 0); + auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); + assert(ins_res == 0); - long result_ids[10]; - float result_distances[10]; + long result_ids[10]; + float result_distances[10]; - auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})"); - std::vector query_raw_data(16); - for (int i = 0; i < 16; i++) { - query_raw_data[i] = e() % 2000 * 0.001 - 1.0; - } + auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})"); + std::vector query_raw_data(16); + for (int i = 0; i < 16; i++) { + query_raw_data[i] = e() % 2000 * 0.001 - 1.0; + } - auto sea_res = Search(segment, query_json.data(), 1, query_raw_data.data(), 16, result_ids, result_distances); - assert(sea_res == 0); + CQueryInfo queryInfo{1, 10, "fakevec"}; - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + auto sea_res = Search(segment, queryInfo, 1, query_raw_data.data(), 16, result_ids, result_distances); + assert(sea_res == 0); + + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } @@ -195,13 +197,14 @@ TEST(CApiTest, SearchSimpleTest) { long result_ids[10]; float result_distances[10]; - auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})"); std::vector query_raw_data(DIM); for (int i = 0; i < DIM; i++) { query_raw_data[i] = i; } - auto sea_res = Search(segment, query_json.data(), 1, query_raw_data.data(), DIM, result_ids, result_distances); + CQueryInfo queryInfo{1, 10, "fakevec"}; + + auto sea_res = Search(segment, queryInfo, 1, query_raw_data.data(), DIM, result_ids, result_distances); assert(sea_res == 0); DeleteCollection(collection); @@ -211,122 +214,122 @@ TEST(CApiTest, SearchSimpleTest) { TEST(CApiTest, IsOpenedTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - auto is_opened = IsOpened(segment); - assert(is_opened); + auto is_opened = IsOpened(segment); + assert(is_opened); - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } TEST(CApiTest, CloseTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - auto status = Close(segment); - assert(status == 0); + auto status = Close(segment); + assert(status == 0); - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } namespace { -auto generate_data(int N) { - std::vector raw_data; - std::vector timestamps; - std::vector uids; - std::default_random_engine er(42); - std::uniform_real_distribution<> distribution(0.0, 1.0); - std::default_random_engine ei(42); - for (int i = 0; i < N; ++i) { + auto generate_data(int N) { + std::vector raw_data; + std::vector timestamps; + std::vector uids; + std::default_random_engine er(42); + std::uniform_real_distribution<> distribution(0.0, 1.0); + std::default_random_engine ei(42); + for (int i = 0; i < N; ++i) { uids.push_back(10 * N + i); timestamps.push_back(0); // append vec float vec[16]; for (auto &x: vec) { - x = distribution(er); + x = distribution(er); } raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec)); int age = ei() % 100; raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age)); + } + return std::make_tuple(raw_data, timestamps, uids); } - return std::make_tuple(raw_data, timestamps, uids); -} } TEST(CApiTest, TestSearchWithIndex) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - int N = 1000 * 1000; - auto[raw_data, timestamps, uids] = generate_data(N); - auto line_sizeof = (sizeof(int) + sizeof(float) * 16); - auto offset = PreInsert(segment, N); - auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); - assert(res == 0); + int N = 1000 * 1000; + auto[raw_data, timestamps, uids] = generate_data(N); + auto line_sizeof = (sizeof(int) + sizeof(float) * 16); + auto offset = PreInsert(segment, N); + auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); + assert(res == 0); - auto row_count = GetRowCount(segment); - assert(row_count == N); + auto row_count = GetRowCount(segment); + assert(row_count == N); - std::vector result_ids(10); - std::vector result_distances(10); - auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})"); - auto sea_res = Search(segment, query_json.data(), 1, (float *) raw_data.data(), 16, result_ids.data(), - result_distances.data()); + std::vector result_ids(10); + std::vector result_distances(10); + CQueryInfo queryInfo{1, 10, "fakevec"}; + auto sea_res = Search(segment, queryInfo, 1, (float *) raw_data.data(), 16, result_ids.data(), + result_distances.data()); - ASSERT_EQ(sea_res, 0); - ASSERT_EQ(result_ids[0], 10 * N); - ASSERT_EQ(result_distances[0], 0); + ASSERT_EQ(sea_res, 0); + ASSERT_EQ(result_ids[0], 10 * N); + ASSERT_EQ(result_distances[0], 0); - auto N_del = N / 2; - std::vector del_ts(N_del, 100); - auto pre_off = PreDelete(segment, N_del); - Delete(segment, pre_off, N_del, uids.data(), del_ts.data()); + auto N_del = N / 2; + std::vector del_ts(N_del, 100); + auto pre_off = PreDelete(segment, N_del); + Delete(segment, pre_off, N_del, uids.data(), del_ts.data()); - Close(segment); - BuildIndex(segment); + Close(segment); + BuildIndex(segment); - std::vector result_ids2(10); - std::vector result_distances2(10); + std::vector result_ids2(10); + std::vector result_distances2(10); - sea_res = Search(segment, query_json.data(), 104, (float *) raw_data.data(), 16, result_ids2.data(), - result_distances2.data()); + sea_res = Search(segment, queryInfo, 104, (float *) raw_data.data(), 16, result_ids2.data(), + result_distances2.data()); // sea_res = Search(segment, nullptr, 104, result_ids2.data(), result_distances2.data()); - std::cout << "case 1" << std::endl; - for (int i = 0; i < 10; ++i) { - std::cout << result_ids[i] << "->" << result_distances[i] << std::endl; - } - std::cout << "case 2" << std::endl; - for (int i = 0; i < 10; ++i) { - std::cout << result_ids2[i] << "->" << result_distances2[i] << std::endl; - } + std::cout << "case 1" << std::endl; + for (int i = 0; i < 10; ++i) { + std::cout << result_ids[i] << "->" << result_distances[i] << std::endl; + } + std::cout << "case 2" << std::endl; + for (int i = 0; i < 10; ++i) { + std::cout << result_ids2[i] << "->" << result_distances2[i] << std::endl; + } - for (auto x: result_ids2) { - ASSERT_GE(x, 10 * N + N_del); - ASSERT_LT(x, 10 * N + N); - } + for (auto x: result_ids2) { + ASSERT_GE(x, 10 * N + N_del); + ASSERT_LT(x, 10 * N + N); + } // auto iter = 0; // for(int i = 0; i < result_ids.size(); ++i) { @@ -342,57 +345,57 @@ TEST(CApiTest, TestSearchWithIndex) { // } - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } TEST(CApiTest, GetDeletedCountTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - long delete_primary_keys[] = {100000, 100001, 100002}; - unsigned long delete_timestamps[] = {0, 0, 0}; + long delete_primary_keys[] = {100000, 100001, 100002}; + unsigned long delete_timestamps[] = {0, 0, 0}; - auto offset = PreDelete(segment, 3); + auto offset = PreDelete(segment, 3); - auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps); - assert(del_res == 0); + auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps); + assert(del_res == 0); - // TODO: assert(deleted_count == len(delete_primary_keys)) - auto deleted_count = GetDeletedCount(segment); - assert(deleted_count == 0); + // TODO: assert(deleted_count == len(delete_primary_keys)) + auto deleted_count = GetDeletedCount(segment); + assert(deleted_count == 0); - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } TEST(CApiTest, GetRowCountTest) { - auto collection_name = "collection0"; - auto schema_tmp_conf = "null_schema"; - auto collection = NewCollection(collection_name, schema_tmp_conf); - auto partition_name = "partition0"; - auto partition = NewPartition(collection, partition_name); - auto segment = NewSegment(partition, 0); + auto collection_name = "collection0"; + auto schema_tmp_conf = "null_schema"; + auto collection = NewCollection(collection_name, schema_tmp_conf); + auto partition_name = "partition0"; + auto partition = NewPartition(collection, partition_name); + auto segment = NewSegment(partition, 0); - int N = 10000; - auto[raw_data, timestamps, uids] = generate_data(N); - auto line_sizeof = (sizeof(int) + sizeof(float) * 16); - auto offset = PreInsert(segment, N); - auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); - assert(res == 0); + int N = 10000; + auto[raw_data, timestamps, uids] = generate_data(N); + auto line_sizeof = (sizeof(int) + sizeof(float) * 16); + auto offset = PreInsert(segment, N); + auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N); + assert(res == 0); - auto row_count = GetRowCount(segment); - assert(row_count == N); + auto row_count = GetRowCount(segment); + assert(row_count == N); - DeleteCollection(collection); - DeletePartition(partition); - DeleteSegment(segment); + DeleteCollection(collection); + DeletePartition(partition); + DeleteSegment(segment); } \ No newline at end of file diff --git a/reader/query_node.go b/reader/query_node.go index eeee0eac22..3f761d7e94 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -359,6 +359,9 @@ func (node *QueryNode) PreInsertAndDelete() msgPb.Status { // 3. Do PreDelete for segmentID := range node.deleteData.deleteIDs { + if segmentID < 0 { + continue + } var targetSegment, err = node.GetSegmentBySegmentID(segmentID) if err != nil { fmt.Println(err.Error()) diff --git a/reader/segment.go b/reader/segment.go index 02d252b265..0aac20ae56 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -13,6 +13,7 @@ package reader */ import "C" import ( + "encoding/json" "fmt" "github.com/czs007/suvlim/errors" msgPb "github.com/czs007/suvlim/pkg/master/grpc/message" @@ -176,20 +177,37 @@ func (s *Segment) SegmentSearch(queryJson string, timestamp uint64, vectorRecord /*C.Search int Search(CSegmentBase c_segment, - const char* query_json, - unsigned long timestamp, - float* query_raw_data, - int num_of_query_raw_data, - long int* result_ids, - float* result_distances); + CQueryInfo c_query_info, + unsigned long timestamp, + float* query_raw_data, + int num_of_query_raw_data, + long int* result_ids, + float* result_distances); */ - // TODO: get top-k's k from queryString - const TopK = 10 + type QueryInfo struct { + NumQueries int64 `json:"num_queries"` + TopK int `json:"topK"` + FieldName string `json:"field_name"` + } - resultIds := make([]int64, TopK) - resultDistances := make([]float32, TopK) + type CQueryInfo C.CQueryInfo + + var query QueryInfo + var err = json.Unmarshal([]byte(queryJson), &query) + if err != nil { + return nil, err + } + fmt.Println(query) + + cQuery := C.CQueryInfo{ + num_queries: C.long(query.NumQueries), + topK: C.int(query.TopK), + field_name: C.CString(query.FieldName), + } + + resultIds := make([]int64, query.TopK) + resultDistances := make([]float32, query.TopK) - var cQueryJson = C.CString(queryJson) var cTimestamp = C.ulong(timestamp) var cResultIds = (*C.long)(&resultIds[0]) var cResultDistances = (*C.float)(&resultDistances[0]) @@ -205,7 +223,7 @@ func (s *Segment) SegmentSearch(queryJson string, timestamp uint64, vectorRecord cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData)) } - var status = C.Search(s.SegmentPtr, cQueryJson, cTimestamp, cQueryRawData, cQueryRawDataLength, cResultIds, cResultDistances) + var status = C.Search(s.SegmentPtr, cQuery, cTimestamp, cQueryRawData, cQueryRawDataLength, cResultIds, cResultDistances) if status != 0 { return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status))) diff --git a/reader/segment_service.go b/reader/segment_service.go index 9680377f50..22cb8a4117 100644 --- a/reader/segment_service.go +++ b/reader/segment_service.go @@ -36,27 +36,31 @@ func (node *QueryNode) SegmentsManagement() { } func (node *QueryNode) SegmentManagementService() { + sleepMillisecondTime := 200 + fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms") for { - sleepMillisecondTime := 200 time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) node.SegmentsManagement() - fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms") } } func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { var statisticData = make([]masterPb.SegmentStat, 0) - for segmentID, segment := range node.SegmentsMap { - currentMemSize := segment.GetMemSize() - memIncreaseRate := float32(currentMemSize-segment.LastMemSize) / (float32(sleepMillisecondTime) / 1000) - stat := masterPb.SegmentStat{ - // TODO: set master pb's segment id type from uint64 to int64 - SegmentId: uint64(segmentID), - MemorySize: currentMemSize, - MemoryRate: memIncreaseRate, + for _, collection := range node.Collections { + for _, partition := range collection.Partitions { + for _, openedSegment := range partition.OpenedSegments { + currentMemSize := openedSegment.GetMemSize() + memIncreaseRate := float32((int64(currentMemSize))-(int64(openedSegment.LastMemSize))) / (float32(sleepMillisecondTime) / 1000) + stat := masterPb.SegmentStat{ + // TODO: set master pb's segment id type from uint64 to int64 + SegmentId: uint64(openedSegment.SegmentId), + MemorySize: currentMemSize, + MemoryRate: memIncreaseRate, + } + statisticData = append(statisticData, stat) + } } - statisticData = append(statisticData, stat) } var status = node.PublicStatistic(&statisticData) @@ -66,10 +70,10 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { } func (node *QueryNode) SegmentStatisticService() { + sleepMillisecondTime := 1000 + fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") for { - sleepMillisecondTime := 1000 time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) node.SegmentStatistic(sleepMillisecondTime) - fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") } }