mirror of https://github.com/milvus-io/milvus.git
parent
7062b64fce
commit
99ef484273
|
@ -201,3 +201,12 @@ if ( NOT MILVUS_DB_PATH )
|
|||
endif ()
|
||||
|
||||
set( GPU_ENABLE "false" )
|
||||
|
||||
install(
|
||||
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/dog_segment/
|
||||
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/include
|
||||
FILES_MATCHING PATTERN "*_c.h"
|
||||
)
|
||||
|
||||
install(FILES ${CMAKE_BINARY_DIR}/src/dog_segment/libmilvus_dog_segment.so
|
||||
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/lib)
|
||||
|
|
|
@ -40,8 +40,10 @@ PreDelete(CSegmentBase c_segment, long int size);
|
|||
|
||||
int
|
||||
Search(CSegmentBase c_segment,
|
||||
void* fake_query,
|
||||
const char* query_json,
|
||||
unsigned long timestamp,
|
||||
float* query_raw_data,
|
||||
int num_of_query_raw_data,
|
||||
long int* result_ids,
|
||||
float* result_distances);
|
||||
|
||||
|
@ -50,6 +52,9 @@ Search(CSegmentBase c_segment,
|
|||
int
|
||||
Close(CSegmentBase c_segment);
|
||||
|
||||
int
|
||||
BuildIndex(CSegmentBase c_segment);
|
||||
|
||||
bool
|
||||
IsOpened(CSegmentBase c_segment);
|
||||
|
||||
|
|
|
@ -164,7 +164,9 @@ class Schema {
|
|||
const FieldMeta&
|
||||
operator[](const std::string& field_name) const {
|
||||
auto offset_iter = offsets_.find(field_name);
|
||||
assert(offset_iter != offsets_.end());
|
||||
if (offset_iter == offsets_.end()) {
|
||||
throw std::runtime_error("Cannot found field_name: " + field_name);
|
||||
}
|
||||
auto offset = offset_iter->second;
|
||||
return (*this)[offset];
|
||||
}
|
||||
|
|
|
@ -91,14 +91,29 @@ PreDelete(CSegmentBase c_segment, long int size) {
|
|||
|
||||
int
|
||||
Search(CSegmentBase c_segment,
|
||||
void* fake_query,
|
||||
const char* query_json,
|
||||
unsigned long timestamp,
|
||||
float* query_raw_data,
|
||||
int num_of_query_raw_data,
|
||||
long int* result_ids,
|
||||
float* result_distances) {
|
||||
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
|
||||
milvus::dog_segment::QueryResult query_result;
|
||||
|
||||
auto res = segment->Query(nullptr, timestamp, query_result);
|
||||
// parse query param json
|
||||
auto query_param_json_string = std::string(query_json);
|
||||
auto query_param_json = nlohmann::json::parse(query_param_json_string);
|
||||
|
||||
// construct QueryPtr
|
||||
auto query_ptr = std::make_shared<milvus::query::Query>();
|
||||
query_ptr->num_queries = query_param_json["num_queries"];
|
||||
query_ptr->topK = query_param_json["topK"];
|
||||
query_ptr->field_name = query_param_json["field_name"];
|
||||
|
||||
query_ptr->query_raw_data.resize(num_of_query_raw_data);
|
||||
memcpy(query_ptr->query_raw_data.data(), query_raw_data, num_of_query_raw_data * sizeof(float));
|
||||
|
||||
auto res = segment->Query(query_ptr, timestamp, query_result);
|
||||
|
||||
// result_ids and result_distances have been allocated memory in goLang,
|
||||
// so we don't need to malloc here.
|
||||
|
|
|
@ -40,8 +40,10 @@ PreDelete(CSegmentBase c_segment, long int size);
|
|||
|
||||
int
|
||||
Search(CSegmentBase c_segment,
|
||||
void* fake_query,
|
||||
const char* query_json,
|
||||
unsigned long timestamp,
|
||||
float* query_raw_data,
|
||||
int num_of_query_raw_data,
|
||||
long int* result_ids,
|
||||
float* result_distances);
|
||||
|
||||
|
|
|
@ -7,178 +7,182 @@
|
|||
#include "dog_segment/segment_c.h"
|
||||
|
||||
|
||||
|
||||
|
||||
TEST(CApiTest, CollectionTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
DeleteCollection(collection);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
DeleteCollection(collection);
|
||||
}
|
||||
|
||||
|
||||
TEST(CApiTest, PartitonTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
}
|
||||
|
||||
|
||||
TEST(CApiTest, SegmentTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
|
||||
TEST(CApiTest, InsertTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
|
||||
std::vector<char> raw_data;
|
||||
std::vector<uint64_t> timestamps;
|
||||
std::vector<int64_t> uids;
|
||||
int N = 10000;
|
||||
std::default_random_engine e(67);
|
||||
for(int i = 0; i < N; ++i) {
|
||||
uids.push_back(100000 + i);
|
||||
timestamps.push_back(0);
|
||||
// append vec
|
||||
float vec[16];
|
||||
for(auto &x: vec) {
|
||||
x = e() % 2000 * 0.001 - 1.0;
|
||||
std::vector<char> raw_data;
|
||||
std::vector<uint64_t> timestamps;
|
||||
std::vector<int64_t> uids;
|
||||
int N = 10000;
|
||||
std::default_random_engine e(67);
|
||||
for (int i = 0; i < N; ++i) {
|
||||
uids.push_back(100000 + i);
|
||||
timestamps.push_back(0);
|
||||
// append vec
|
||||
float vec[16];
|
||||
for (auto &x: vec) {
|
||||
x = e() % 2000 * 0.001 - 1.0;
|
||||
}
|
||||
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
|
||||
int age = e() % 100;
|
||||
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
|
||||
}
|
||||
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
|
||||
int age = e() % 100;
|
||||
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
|
||||
}
|
||||
|
||||
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
|
||||
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
|
||||
|
||||
auto offset = PreInsert(segment, N);
|
||||
auto offset = PreInsert(segment, N);
|
||||
|
||||
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
|
||||
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
|
||||
|
||||
assert(res == 0);
|
||||
assert(res == 0);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
|
||||
TEST(CApiTest, DeleteTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
|
||||
long delete_primary_keys[] = {100000, 100001, 100002};
|
||||
unsigned long delete_timestamps[] = {0, 0, 0};
|
||||
long delete_primary_keys[] = {100000, 100001, 100002};
|
||||
unsigned long delete_timestamps[] = {0, 0, 0};
|
||||
|
||||
auto offset = PreDelete(segment, 3);
|
||||
auto offset = PreDelete(segment, 3);
|
||||
|
||||
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
|
||||
assert(del_res == 0);
|
||||
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
|
||||
assert(del_res == 0);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
|
||||
|
||||
TEST(CApiTest, SearchTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
|
||||
std::vector<char> raw_data;
|
||||
std::vector<uint64_t> timestamps;
|
||||
std::vector<int64_t> uids;
|
||||
int N = 10000;
|
||||
std::default_random_engine e(67);
|
||||
for(int i = 0; i < N; ++i) {
|
||||
uids.push_back(100000 + i);
|
||||
timestamps.push_back(0);
|
||||
// append vec
|
||||
float vec[16];
|
||||
for(auto &x: vec) {
|
||||
x = e() % 2000 * 0.001 - 1.0;
|
||||
std::vector<char> raw_data;
|
||||
std::vector<uint64_t> timestamps;
|
||||
std::vector<int64_t> uids;
|
||||
int N = 10000;
|
||||
std::default_random_engine e(67);
|
||||
for (int i = 0; i < N; ++i) {
|
||||
uids.push_back(100000 + i);
|
||||
timestamps.push_back(0);
|
||||
// append vec
|
||||
float vec[16];
|
||||
for (auto &x: vec) {
|
||||
x = e() % 2000 * 0.001 - 1.0;
|
||||
}
|
||||
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
|
||||
int age = e() % 100;
|
||||
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
|
||||
}
|
||||
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
|
||||
int age = e() % 100;
|
||||
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
|
||||
}
|
||||
|
||||
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
|
||||
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
|
||||
|
||||
auto offset = PreInsert(segment, N);
|
||||
auto offset = PreInsert(segment, N);
|
||||
|
||||
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
|
||||
assert(ins_res == 0);
|
||||
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
|
||||
assert(ins_res == 0);
|
||||
|
||||
long result_ids[10];
|
||||
float result_distances[10];
|
||||
auto sea_res = Search(segment, nullptr, 1, result_ids, result_distances);
|
||||
assert(sea_res == 0);
|
||||
long result_ids[10];
|
||||
float result_distances[10];
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})");
|
||||
std::vector<float> query_raw_data(16);
|
||||
for (int i = 0; i < 16; i++) {
|
||||
query_raw_data[i] = e() % 2000 * 0.001 - 1.0;
|
||||
}
|
||||
|
||||
auto sea_res = Search(segment, query_json.data(), 1, query_raw_data.data(), 16, result_ids, result_distances);
|
||||
assert(sea_res == 0);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
|
||||
TEST(CApiTest, IsOpenedTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
|
||||
auto is_opened = IsOpened(segment);
|
||||
assert(is_opened);
|
||||
auto is_opened = IsOpened(segment);
|
||||
assert(is_opened);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
|
||||
TEST(CApiTest, CloseTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
|
||||
auto status = Close(segment);
|
||||
assert(status == 0);
|
||||
auto status = Close(segment);
|
||||
assert(status == 0);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
|
||||
|
@ -190,17 +194,17 @@ auto generate_data(int N) {
|
|||
std::default_random_engine er(42);
|
||||
std::uniform_real_distribution<> distribution(0.0, 1.0);
|
||||
std::default_random_engine ei(42);
|
||||
for(int i = 0; i < N; ++i) {
|
||||
for (int i = 0; i < N; ++i) {
|
||||
uids.push_back(10 * N + i);
|
||||
timestamps.push_back(0);
|
||||
// append vec
|
||||
float vec[16];
|
||||
for(auto &x: vec) {
|
||||
for (auto &x: vec) {
|
||||
x = distribution(er);
|
||||
}
|
||||
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
|
||||
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
|
||||
int age = ei() % 100;
|
||||
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
|
||||
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
|
||||
}
|
||||
return std::make_tuple(raw_data, timestamps, uids);
|
||||
}
|
||||
|
@ -217,10 +221,10 @@ TEST(CApiTest, TestQuery) {
|
|||
|
||||
|
||||
int N = 1000 * 1000;
|
||||
auto [raw_data, timestamps, uids] = generate_data(N);
|
||||
auto[raw_data, timestamps, uids] = generate_data(N);
|
||||
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
|
||||
auto offset = PreInsert(segment, N);
|
||||
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
|
||||
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
|
||||
assert(res == 0);
|
||||
|
||||
auto row_count = GetRowCount(segment);
|
||||
|
@ -228,7 +232,9 @@ TEST(CApiTest, TestQuery) {
|
|||
|
||||
std::vector<long> result_ids(10);
|
||||
std::vector<float> result_distances(10);
|
||||
auto sea_res = Search(segment, nullptr, 1, result_ids.data(), result_distances.data());
|
||||
auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})");
|
||||
auto sea_res = Search(segment, query_json.data(), 1, (float *) raw_data.data(), 16, result_ids.data(),
|
||||
result_distances.data());
|
||||
|
||||
ASSERT_EQ(sea_res, 0);
|
||||
ASSERT_EQ(result_ids[0], 10 * N);
|
||||
|
@ -236,7 +242,7 @@ TEST(CApiTest, TestQuery) {
|
|||
|
||||
auto N_del = N / 2;
|
||||
std::vector<uint64_t> del_ts(N_del, 100);
|
||||
auto pre_off = PreDelete(segment, N_del);
|
||||
auto pre_off = PreDelete(segment, N_del);
|
||||
Delete(segment, pre_off, N_del, uids.data(), del_ts.data());
|
||||
|
||||
Close(segment);
|
||||
|
@ -245,19 +251,22 @@ TEST(CApiTest, TestQuery) {
|
|||
|
||||
std::vector<long> result_ids2(10);
|
||||
std::vector<float> result_distances2(10);
|
||||
sea_res = Search(segment, nullptr, 104, result_ids2.data(), result_distances2.data());
|
||||
|
||||
sea_res = Search(segment, query_json.data(), 104, (float *) raw_data.data(), 16, result_ids2.data(),
|
||||
result_distances2.data());
|
||||
// sea_res = Search(segment, nullptr, 104, result_ids2.data(), result_distances2.data());
|
||||
|
||||
std::cout << "case 1" << std::endl;
|
||||
for(int i = 0; i < 10; ++i) {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
std::cout << result_ids[i] << "->" << result_distances[i] << std::endl;
|
||||
}
|
||||
std::cout << "case 2" << std::endl;
|
||||
for(int i = 0; i < 10; ++i) {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
std::cout << result_ids2[i] << "->" << result_distances2[i] << std::endl;
|
||||
}
|
||||
|
||||
for(auto x: result_ids2) {
|
||||
ASSERT_GE(x, 10 * N + N_del);
|
||||
for (auto x: result_ids2) {
|
||||
ASSERT_GE(x, 10 * N + N_del);
|
||||
ASSERT_LT(x, 10 * N + N);
|
||||
}
|
||||
|
||||
|
@ -281,28 +290,28 @@ TEST(CApiTest, TestQuery) {
|
|||
}
|
||||
|
||||
TEST(CApiTest, GetDeletedCountTest) {
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
auto collection_name = "collection0";
|
||||
auto schema_tmp_conf = "null_schema";
|
||||
auto collection = NewCollection(collection_name, schema_tmp_conf);
|
||||
auto partition_name = "partition0";
|
||||
auto partition = NewPartition(collection, partition_name);
|
||||
auto segment = NewSegment(partition, 0);
|
||||
|
||||
long delete_primary_keys[] = {100000, 100001, 100002};
|
||||
unsigned long delete_timestamps[] = {0, 0, 0};
|
||||
long delete_primary_keys[] = {100000, 100001, 100002};
|
||||
unsigned long delete_timestamps[] = {0, 0, 0};
|
||||
|
||||
auto offset = PreDelete(segment, 3);
|
||||
auto offset = PreDelete(segment, 3);
|
||||
|
||||
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
|
||||
assert(del_res == 0);
|
||||
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
|
||||
assert(del_res == 0);
|
||||
|
||||
// TODO: assert(deleted_count == len(delete_primary_keys))
|
||||
auto deleted_count = GetDeletedCount(segment);
|
||||
assert(deleted_count == 0);
|
||||
// TODO: assert(deleted_count == len(delete_primary_keys))
|
||||
auto deleted_count = GetDeletedCount(segment);
|
||||
assert(deleted_count == 0);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
DeleteCollection(collection);
|
||||
DeletePartition(partition);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
|
||||
|
@ -316,10 +325,10 @@ TEST(CApiTest, GetRowCountTest) {
|
|||
|
||||
|
||||
int N = 10000;
|
||||
auto [raw_data, timestamps, uids] = generate_data(N);
|
||||
auto[raw_data, timestamps, uids] = generate_data(N);
|
||||
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
|
||||
auto offset = PreInsert(segment, N);
|
||||
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
|
||||
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
|
||||
assert(res == 0);
|
||||
|
||||
auto row_count = GetRowCount(segment);
|
||||
|
|
|
@ -25,13 +25,15 @@ add_subdirectory( db ) # target milvus_engine
|
|||
add_subdirectory( log )
|
||||
add_subdirectory( server )
|
||||
add_subdirectory( message_client )
|
||||
add_subdirectory( meta )
|
||||
|
||||
set(link_lib
|
||||
milvus_engine
|
||||
config
|
||||
query
|
||||
query
|
||||
utils
|
||||
log
|
||||
meta
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -85,6 +85,11 @@ ConfigMgr::ConfigMgr() {
|
|||
"localhost", nullptr, nullptr)},
|
||||
{"pulsar.port", CreateIntegerConfig("pulsar.port", false, 0, 65535, &config.pulsar.port.value,
|
||||
6650, nullptr, nullptr)},
|
||||
/* master */
|
||||
{"master.address", CreateStringConfig("master.address", false, &config.master.address.value,
|
||||
"localhost", nullptr, nullptr)},
|
||||
{"master.port", CreateIntegerConfig("master.port", false, 0, 65535, &config.master.port.value,
|
||||
6000, nullptr, nullptr)},
|
||||
|
||||
|
||||
/* log */
|
||||
|
|
|
@ -76,6 +76,11 @@ struct ServerConfig {
|
|||
Integer port{6650};
|
||||
}pulsar;
|
||||
|
||||
struct Master{
|
||||
String address{"localhost"};
|
||||
Integer port{6000};
|
||||
}master;
|
||||
|
||||
|
||||
struct Engine {
|
||||
Integer build_index_threshold{4096};
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
// Generated by the gRPC C++ plugin.
|
||||
// If you make any local change, they will be lost.
|
||||
// source: etcd.proto
|
||||
|
||||
#include "etcd.pb.h"
|
||||
#include "etcd.grpc.pb.h"
|
||||
|
||||
#include <functional>
|
||||
#include <grpcpp/impl/codegen/async_stream.h>
|
||||
#include <grpcpp/impl/codegen/async_unary_call.h>
|
||||
#include <grpcpp/impl/codegen/channel_interface.h>
|
||||
#include <grpcpp/impl/codegen/client_unary_call.h>
|
||||
#include <grpcpp/impl/codegen/client_callback.h>
|
||||
#include <grpcpp/impl/codegen/method_handler_impl.h>
|
||||
#include <grpcpp/impl/codegen/rpc_service_method.h>
|
||||
#include <grpcpp/impl/codegen/server_callback.h>
|
||||
#include <grpcpp/impl/codegen/service_type.h>
|
||||
#include <grpcpp/impl/codegen/sync_stream.h>
|
||||
namespace etcdserverpb {
|
||||
|
||||
static const char* Watch_method_names[] = {
|
||||
"/etcdserverpb.Watch/Watch",
|
||||
};
|
||||
|
||||
std::unique_ptr< Watch::Stub> Watch::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
|
||||
(void)options;
|
||||
std::unique_ptr< Watch::Stub> stub(new Watch::Stub(channel));
|
||||
return stub;
|
||||
}
|
||||
|
||||
Watch::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
|
||||
: channel_(channel), rpcmethod_Watch_(Watch_method_names[0], ::grpc::internal::RpcMethod::BIDI_STREAMING, channel)
|
||||
{}
|
||||
|
||||
::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::WatchRaw(::grpc::ClientContext* context) {
|
||||
return ::grpc_impl::internal::ClientReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), rpcmethod_Watch_, context);
|
||||
}
|
||||
|
||||
void Watch::Stub::experimental_async::Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) {
|
||||
::grpc_impl::internal::ClientCallbackReaderWriterFactory< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>::Create(stub_->channel_.get(), stub_->rpcmethod_Watch_, context, reactor);
|
||||
}
|
||||
|
||||
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
|
||||
return ::grpc_impl::internal::ClientAsyncReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), cq, rpcmethod_Watch_, context, true, tag);
|
||||
}
|
||||
|
||||
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
|
||||
return ::grpc_impl::internal::ClientAsyncReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), cq, rpcmethod_Watch_, context, false, nullptr);
|
||||
}
|
||||
|
||||
Watch::Service::Service() {
|
||||
AddMethod(new ::grpc::internal::RpcServiceMethod(
|
||||
Watch_method_names[0],
|
||||
::grpc::internal::RpcMethod::BIDI_STREAMING,
|
||||
new ::grpc::internal::BidiStreamingHandler< Watch::Service, ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>(
|
||||
std::mem_fn(&Watch::Service::Watch), this)));
|
||||
}
|
||||
|
||||
Watch::Service::~Service() {
|
||||
}
|
||||
|
||||
::grpc::Status Watch::Service::Watch(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream) {
|
||||
(void) context;
|
||||
(void) stream;
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
|
||||
|
||||
} // namespace etcdserverpb
|
||||
|
|
@ -0,0 +1,235 @@
|
|||
// Generated by the gRPC C++ plugin.
|
||||
// If you make any local change, they will be lost.
|
||||
// source: etcd.proto
|
||||
#ifndef GRPC_etcd_2eproto__INCLUDED
|
||||
#define GRPC_etcd_2eproto__INCLUDED
|
||||
|
||||
#include "etcd.pb.h"
|
||||
|
||||
#include <functional>
|
||||
#include <grpcpp/impl/codegen/async_generic_service.h>
|
||||
#include <grpcpp/impl/codegen/async_stream.h>
|
||||
#include <grpcpp/impl/codegen/async_unary_call.h>
|
||||
#include <grpcpp/impl/codegen/client_callback.h>
|
||||
#include <grpcpp/impl/codegen/client_context.h>
|
||||
#include <grpcpp/impl/codegen/completion_queue.h>
|
||||
#include <grpcpp/impl/codegen/method_handler_impl.h>
|
||||
#include <grpcpp/impl/codegen/proto_utils.h>
|
||||
#include <grpcpp/impl/codegen/rpc_method.h>
|
||||
#include <grpcpp/impl/codegen/server_callback.h>
|
||||
#include <grpcpp/impl/codegen/server_context.h>
|
||||
#include <grpcpp/impl/codegen/service_type.h>
|
||||
#include <grpcpp/impl/codegen/status.h>
|
||||
#include <grpcpp/impl/codegen/stub_options.h>
|
||||
#include <grpcpp/impl/codegen/sync_stream.h>
|
||||
|
||||
namespace grpc_impl {
|
||||
class CompletionQueue;
|
||||
class ServerCompletionQueue;
|
||||
class ServerContext;
|
||||
} // namespace grpc_impl
|
||||
|
||||
namespace grpc {
|
||||
namespace experimental {
|
||||
template <typename RequestT, typename ResponseT>
|
||||
class MessageAllocator;
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
||||
|
||||
namespace etcdserverpb {
|
||||
|
||||
class Watch final {
|
||||
public:
|
||||
static constexpr char const* service_full_name() {
|
||||
return "etcdserverpb.Watch";
|
||||
}
|
||||
class StubInterface {
|
||||
public:
|
||||
virtual ~StubInterface() {}
|
||||
// Watch watches for events happening or that have happened. Both input and output
|
||||
// are streams; the input stream is for creating and canceling watchers and the output
|
||||
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
|
||||
// for several watches at once. The entire event history can be watched starting from the
|
||||
// last compaction revision.
|
||||
std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> Watch(::grpc::ClientContext* context) {
|
||||
return std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(WatchRaw(context));
|
||||
}
|
||||
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> AsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(AsyncWatchRaw(context, cq, tag));
|
||||
}
|
||||
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> PrepareAsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(PrepareAsyncWatchRaw(context, cq));
|
||||
}
|
||||
class experimental_async_interface {
|
||||
public:
|
||||
virtual ~experimental_async_interface() {}
|
||||
// Watch watches for events happening or that have happened. Both input and output
|
||||
// are streams; the input stream is for creating and canceling watchers and the output
|
||||
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
|
||||
// for several watches at once. The entire event history can be watched starting from the
|
||||
// last compaction revision.
|
||||
virtual void Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) = 0;
|
||||
};
|
||||
virtual class experimental_async_interface* experimental_async() { return nullptr; }
|
||||
private:
|
||||
virtual ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* WatchRaw(::grpc::ClientContext* context) = 0;
|
||||
virtual ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0;
|
||||
virtual ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) = 0;
|
||||
};
|
||||
class Stub final : public StubInterface {
|
||||
public:
|
||||
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
|
||||
std::unique_ptr< ::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> Watch(::grpc::ClientContext* context) {
|
||||
return std::unique_ptr< ::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(WatchRaw(context));
|
||||
}
|
||||
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> AsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(AsyncWatchRaw(context, cq, tag));
|
||||
}
|
||||
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> PrepareAsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(PrepareAsyncWatchRaw(context, cq));
|
||||
}
|
||||
class experimental_async final :
|
||||
public StubInterface::experimental_async_interface {
|
||||
public:
|
||||
void Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) override;
|
||||
private:
|
||||
friend class Stub;
|
||||
explicit experimental_async(Stub* stub): stub_(stub) { }
|
||||
Stub* stub() { return stub_; }
|
||||
Stub* stub_;
|
||||
};
|
||||
class experimental_async_interface* experimental_async() override { return &async_stub_; }
|
||||
|
||||
private:
|
||||
std::shared_ptr< ::grpc::ChannelInterface> channel_;
|
||||
class experimental_async async_stub_{this};
|
||||
::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* WatchRaw(::grpc::ClientContext* context) override;
|
||||
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
|
||||
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override;
|
||||
const ::grpc::internal::RpcMethod rpcmethod_Watch_;
|
||||
};
|
||||
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
|
||||
|
||||
class Service : public ::grpc::Service {
|
||||
public:
|
||||
Service();
|
||||
virtual ~Service();
|
||||
// Watch watches for events happening or that have happened. Both input and output
|
||||
// are streams; the input stream is for creating and canceling watchers and the output
|
||||
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
|
||||
// for several watches at once. The entire event history can be watched starting from the
|
||||
// last compaction revision.
|
||||
virtual ::grpc::Status Watch(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream);
|
||||
};
|
||||
template <class BaseClass>
|
||||
class WithAsyncMethod_Watch : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
WithAsyncMethod_Watch() {
|
||||
::grpc::Service::MarkMethodAsync(0);
|
||||
}
|
||||
~WithAsyncMethod_Watch() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
void RequestWatch(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
|
||||
::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
|
||||
}
|
||||
};
|
||||
typedef WithAsyncMethod_Watch<Service > AsyncService;
|
||||
template <class BaseClass>
|
||||
class ExperimentalWithCallbackMethod_Watch : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
ExperimentalWithCallbackMethod_Watch() {
|
||||
::grpc::Service::experimental().MarkMethodCallback(0,
|
||||
new ::grpc_impl::internal::CallbackBidiHandler< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>(
|
||||
[this] { return this->Watch(); }));
|
||||
}
|
||||
~ExperimentalWithCallbackMethod_Watch() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
virtual ::grpc::experimental::ServerBidiReactor< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch() {
|
||||
return new ::grpc_impl::internal::UnimplementedBidiReactor<
|
||||
::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>;}
|
||||
};
|
||||
typedef ExperimentalWithCallbackMethod_Watch<Service > ExperimentalCallbackService;
|
||||
template <class BaseClass>
|
||||
class WithGenericMethod_Watch : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
WithGenericMethod_Watch() {
|
||||
::grpc::Service::MarkMethodGeneric(0);
|
||||
}
|
||||
~WithGenericMethod_Watch() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
};
|
||||
template <class BaseClass>
|
||||
class WithRawMethod_Watch : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
WithRawMethod_Watch() {
|
||||
::grpc::Service::MarkMethodRaw(0);
|
||||
}
|
||||
~WithRawMethod_Watch() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
void RequestWatch(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
|
||||
::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
|
||||
}
|
||||
};
|
||||
template <class BaseClass>
|
||||
class ExperimentalWithRawCallbackMethod_Watch : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
ExperimentalWithRawCallbackMethod_Watch() {
|
||||
::grpc::Service::experimental().MarkMethodRawCallback(0,
|
||||
new ::grpc_impl::internal::CallbackBidiHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
|
||||
[this] { return this->Watch(); }));
|
||||
}
|
||||
~ExperimentalWithRawCallbackMethod_Watch() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
virtual ::grpc::experimental::ServerBidiReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* Watch() {
|
||||
return new ::grpc_impl::internal::UnimplementedBidiReactor<
|
||||
::grpc::ByteBuffer, ::grpc::ByteBuffer>;}
|
||||
};
|
||||
typedef Service StreamedUnaryService;
|
||||
typedef Service SplitStreamedService;
|
||||
typedef Service StreamedService;
|
||||
};
|
||||
|
||||
} // namespace etcdserverpb
|
||||
|
||||
|
||||
#endif // GRPC_etcd_2eproto__INCLUDED
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,83 @@
|
|||
// Generated by the gRPC C++ plugin.
|
||||
// If you make any local change, they will be lost.
|
||||
// source: master.proto
|
||||
|
||||
#include "master.pb.h"
|
||||
#include "master.grpc.pb.h"
|
||||
|
||||
#include <functional>
|
||||
#include <grpcpp/impl/codegen/async_stream.h>
|
||||
#include <grpcpp/impl/codegen/async_unary_call.h>
|
||||
#include <grpcpp/impl/codegen/channel_interface.h>
|
||||
#include <grpcpp/impl/codegen/client_unary_call.h>
|
||||
#include <grpcpp/impl/codegen/client_callback.h>
|
||||
#include <grpcpp/impl/codegen/method_handler_impl.h>
|
||||
#include <grpcpp/impl/codegen/rpc_service_method.h>
|
||||
#include <grpcpp/impl/codegen/server_callback.h>
|
||||
#include <grpcpp/impl/codegen/service_type.h>
|
||||
#include <grpcpp/impl/codegen/sync_stream.h>
|
||||
namespace masterpb {
|
||||
|
||||
static const char* Master_method_names[] = {
|
||||
"/masterpb.Master/CreateCollection",
|
||||
};
|
||||
|
||||
std::unique_ptr< Master::Stub> Master::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
|
||||
(void)options;
|
||||
std::unique_ptr< Master::Stub> stub(new Master::Stub(channel));
|
||||
return stub;
|
||||
}
|
||||
|
||||
Master::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
|
||||
: channel_(channel), rpcmethod_CreateCollection_(Master_method_names[0], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
|
||||
{}
|
||||
|
||||
::grpc::Status Master::Stub::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) {
|
||||
return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_CreateCollection_, context, request, response);
|
||||
}
|
||||
|
||||
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)> f) {
|
||||
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, std::move(f));
|
||||
}
|
||||
|
||||
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)> f) {
|
||||
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, std::move(f));
|
||||
}
|
||||
|
||||
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
|
||||
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, reactor);
|
||||
}
|
||||
|
||||
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
|
||||
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, reactor);
|
||||
}
|
||||
|
||||
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* Master::Stub::AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
|
||||
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::Status>::Create(channel_.get(), cq, rpcmethod_CreateCollection_, context, request, true);
|
||||
}
|
||||
|
||||
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* Master::Stub::PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
|
||||
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::Status>::Create(channel_.get(), cq, rpcmethod_CreateCollection_, context, request, false);
|
||||
}
|
||||
|
||||
Master::Service::Service() {
|
||||
AddMethod(new ::grpc::internal::RpcServiceMethod(
|
||||
Master_method_names[0],
|
||||
::grpc::internal::RpcMethod::NORMAL_RPC,
|
||||
new ::grpc::internal::RpcMethodHandler< Master::Service, ::milvus::grpc::Mapping, ::milvus::grpc::Status>(
|
||||
std::mem_fn(&Master::Service::CreateCollection), this)));
|
||||
}
|
||||
|
||||
Master::Service::~Service() {
|
||||
}
|
||||
|
||||
::grpc::Status Master::Service::CreateCollection(::grpc::ServerContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response) {
|
||||
(void) context;
|
||||
(void) request;
|
||||
(void) response;
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
|
||||
|
||||
} // namespace masterpb
|
||||
|
|
@ -0,0 +1,252 @@
|
|||
// Generated by the gRPC C++ plugin.
|
||||
// If you make any local change, they will be lost.
|
||||
// source: master.proto
|
||||
#ifndef GRPC_master_2eproto__INCLUDED
|
||||
#define GRPC_master_2eproto__INCLUDED
|
||||
|
||||
#include "master.pb.h"
|
||||
|
||||
#include <functional>
|
||||
#include <grpcpp/impl/codegen/async_generic_service.h>
|
||||
#include <grpcpp/impl/codegen/async_stream.h>
|
||||
#include <grpcpp/impl/codegen/async_unary_call.h>
|
||||
#include <grpcpp/impl/codegen/client_callback.h>
|
||||
#include <grpcpp/impl/codegen/client_context.h>
|
||||
#include <grpcpp/impl/codegen/completion_queue.h>
|
||||
#include <grpcpp/impl/codegen/method_handler_impl.h>
|
||||
#include <grpcpp/impl/codegen/proto_utils.h>
|
||||
#include <grpcpp/impl/codegen/rpc_method.h>
|
||||
#include <grpcpp/impl/codegen/server_callback.h>
|
||||
#include <grpcpp/impl/codegen/server_context.h>
|
||||
#include <grpcpp/impl/codegen/service_type.h>
|
||||
#include <grpcpp/impl/codegen/status.h>
|
||||
#include <grpcpp/impl/codegen/stub_options.h>
|
||||
#include <grpcpp/impl/codegen/sync_stream.h>
|
||||
|
||||
namespace grpc_impl {
|
||||
class CompletionQueue;
|
||||
class ServerCompletionQueue;
|
||||
class ServerContext;
|
||||
} // namespace grpc_impl
|
||||
|
||||
namespace grpc {
|
||||
namespace experimental {
|
||||
template <typename RequestT, typename ResponseT>
|
||||
class MessageAllocator;
|
||||
} // namespace experimental
|
||||
} // namespace grpc
|
||||
|
||||
namespace masterpb {
|
||||
|
||||
class Master final {
|
||||
public:
|
||||
static constexpr char const* service_full_name() {
|
||||
return "masterpb.Master";
|
||||
}
|
||||
class StubInterface {
|
||||
public:
|
||||
virtual ~StubInterface() {}
|
||||
virtual ::grpc::Status CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) = 0;
|
||||
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>> AsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>>(AsyncCreateCollectionRaw(context, request, cq));
|
||||
}
|
||||
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>> PrepareAsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>>(PrepareAsyncCreateCollectionRaw(context, request, cq));
|
||||
}
|
||||
class experimental_async_interface {
|
||||
public:
|
||||
virtual ~experimental_async_interface() {}
|
||||
virtual void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) = 0;
|
||||
virtual void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) = 0;
|
||||
virtual void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
|
||||
virtual void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
|
||||
};
|
||||
virtual class experimental_async_interface* experimental_async() { return nullptr; }
|
||||
private:
|
||||
virtual ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>* AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) = 0;
|
||||
virtual ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>* PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) = 0;
|
||||
};
|
||||
class Stub final : public StubInterface {
|
||||
public:
|
||||
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
|
||||
::grpc::Status CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) override;
|
||||
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>> AsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>>(AsyncCreateCollectionRaw(context, request, cq));
|
||||
}
|
||||
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>> PrepareAsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
|
||||
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>>(PrepareAsyncCreateCollectionRaw(context, request, cq));
|
||||
}
|
||||
class experimental_async final :
|
||||
public StubInterface::experimental_async_interface {
|
||||
public:
|
||||
void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) override;
|
||||
void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) override;
|
||||
void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
|
||||
void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
|
||||
private:
|
||||
friend class Stub;
|
||||
explicit experimental_async(Stub* stub): stub_(stub) { }
|
||||
Stub* stub() { return stub_; }
|
||||
Stub* stub_;
|
||||
};
|
||||
class experimental_async_interface* experimental_async() override { return &async_stub_; }
|
||||
|
||||
private:
|
||||
std::shared_ptr< ::grpc::ChannelInterface> channel_;
|
||||
class experimental_async async_stub_{this};
|
||||
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) override;
|
||||
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) override;
|
||||
const ::grpc::internal::RpcMethod rpcmethod_CreateCollection_;
|
||||
};
|
||||
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
|
||||
|
||||
class Service : public ::grpc::Service {
|
||||
public:
|
||||
Service();
|
||||
virtual ~Service();
|
||||
virtual ::grpc::Status CreateCollection(::grpc::ServerContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response);
|
||||
};
|
||||
template <class BaseClass>
|
||||
class WithAsyncMethod_CreateCollection : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
WithAsyncMethod_CreateCollection() {
|
||||
::grpc::Service::MarkMethodAsync(0);
|
||||
}
|
||||
~WithAsyncMethod_CreateCollection() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
void RequestCreateCollection(::grpc::ServerContext* context, ::milvus::grpc::Mapping* request, ::grpc::ServerAsyncResponseWriter< ::milvus::grpc::Status>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
|
||||
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
|
||||
}
|
||||
};
|
||||
typedef WithAsyncMethod_CreateCollection<Service > AsyncService;
|
||||
template <class BaseClass>
|
||||
class ExperimentalWithCallbackMethod_CreateCollection : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
ExperimentalWithCallbackMethod_CreateCollection() {
|
||||
::grpc::Service::experimental().MarkMethodCallback(0,
|
||||
new ::grpc_impl::internal::CallbackUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>(
|
||||
[this](::grpc::ServerContext* context,
|
||||
const ::milvus::grpc::Mapping* request,
|
||||
::milvus::grpc::Status* response,
|
||||
::grpc::experimental::ServerCallbackRpcController* controller) {
|
||||
return this->CreateCollection(context, request, response, controller);
|
||||
}));
|
||||
}
|
||||
void SetMessageAllocatorFor_CreateCollection(
|
||||
::grpc::experimental::MessageAllocator< ::milvus::grpc::Mapping, ::milvus::grpc::Status>* allocator) {
|
||||
static_cast<::grpc_impl::internal::CallbackUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>*>(
|
||||
::grpc::Service::experimental().GetHandler(0))
|
||||
->SetMessageAllocator(allocator);
|
||||
}
|
||||
~ExperimentalWithCallbackMethod_CreateCollection() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
virtual void CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
|
||||
};
|
||||
typedef ExperimentalWithCallbackMethod_CreateCollection<Service > ExperimentalCallbackService;
|
||||
template <class BaseClass>
|
||||
class WithGenericMethod_CreateCollection : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
WithGenericMethod_CreateCollection() {
|
||||
::grpc::Service::MarkMethodGeneric(0);
|
||||
}
|
||||
~WithGenericMethod_CreateCollection() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
};
|
||||
template <class BaseClass>
|
||||
class WithRawMethod_CreateCollection : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
WithRawMethod_CreateCollection() {
|
||||
::grpc::Service::MarkMethodRaw(0);
|
||||
}
|
||||
~WithRawMethod_CreateCollection() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
void RequestCreateCollection(::grpc::ServerContext* context, ::grpc::ByteBuffer* request, ::grpc::ServerAsyncResponseWriter< ::grpc::ByteBuffer>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
|
||||
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
|
||||
}
|
||||
};
|
||||
template <class BaseClass>
|
||||
class ExperimentalWithRawCallbackMethod_CreateCollection : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
ExperimentalWithRawCallbackMethod_CreateCollection() {
|
||||
::grpc::Service::experimental().MarkMethodRawCallback(0,
|
||||
new ::grpc_impl::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
|
||||
[this](::grpc::ServerContext* context,
|
||||
const ::grpc::ByteBuffer* request,
|
||||
::grpc::ByteBuffer* response,
|
||||
::grpc::experimental::ServerCallbackRpcController* controller) {
|
||||
this->CreateCollection(context, request, response, controller);
|
||||
}));
|
||||
}
|
||||
~ExperimentalWithRawCallbackMethod_CreateCollection() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable synchronous version of this method
|
||||
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
virtual void CreateCollection(::grpc::ServerContext* /*context*/, const ::grpc::ByteBuffer* /*request*/, ::grpc::ByteBuffer* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
|
||||
};
|
||||
template <class BaseClass>
|
||||
class WithStreamedUnaryMethod_CreateCollection : public BaseClass {
|
||||
private:
|
||||
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
|
||||
public:
|
||||
WithStreamedUnaryMethod_CreateCollection() {
|
||||
::grpc::Service::MarkMethodStreamed(0,
|
||||
new ::grpc::internal::StreamedUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>(std::bind(&WithStreamedUnaryMethod_CreateCollection<BaseClass>::StreamedCreateCollection, this, std::placeholders::_1, std::placeholders::_2)));
|
||||
}
|
||||
~WithStreamedUnaryMethod_CreateCollection() override {
|
||||
BaseClassMustBeDerivedFromService(this);
|
||||
}
|
||||
// disable regular version of this method
|
||||
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
|
||||
abort();
|
||||
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
|
||||
}
|
||||
// replace default version of method with streamed unary
|
||||
virtual ::grpc::Status StreamedCreateCollection(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::milvus::grpc::Mapping,::milvus::grpc::Status>* server_unary_streamer) = 0;
|
||||
};
|
||||
typedef WithStreamedUnaryMethod_CreateCollection<Service > StreamedUnaryService;
|
||||
typedef Service SplitStreamedService;
|
||||
typedef WithStreamedUnaryMethod_CreateCollection<Service > StreamedService;
|
||||
};
|
||||
|
||||
} // namespace masterpb
|
||||
|
||||
|
||||
#endif // GRPC_master_2eproto__INCLUDED
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,164 @@
|
|||
syntax = "proto3";
|
||||
package etcdserverpb;
|
||||
|
||||
service Watch {
|
||||
// Watch watches for events happening or that have happened. Both input and output
|
||||
// are streams; the input stream is for creating and canceling watchers and the output
|
||||
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
|
||||
// for several watches at once. The entire event history can be watched starting from the
|
||||
// last compaction revision.
|
||||
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {
|
||||
}
|
||||
}
|
||||
|
||||
message WatchRequest {
|
||||
// request_union is a request to either create a new watcher or cancel an existing watcher.
|
||||
oneof request_union {
|
||||
WatchCreateRequest create_request = 1;
|
||||
WatchCancelRequest cancel_request = 2;
|
||||
WatchProgressRequest progress_request = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message WatchCreateRequest {
|
||||
// key is the key to register for watching.
|
||||
bytes key = 1;
|
||||
|
||||
// range_end is the end of the range [key, range_end) to watch. If range_end is not given,
|
||||
// only the key argument is watched. If range_end is equal to '\0', all keys greater than
|
||||
// or equal to the key argument are watched.
|
||||
// If the range_end is one bit larger than the given key,
|
||||
// then all keys with the prefix (the given key) will be watched.
|
||||
bytes range_end = 2;
|
||||
|
||||
// start_revision is an optional revision to watch from (inclusive). No start_revision is "now".
|
||||
int64 start_revision = 3;
|
||||
|
||||
// progress_notify is set so that the etcd server will periodically send a WatchResponse with
|
||||
// no events to the new watcher if there are no recent events. It is useful when clients
|
||||
// wish to recover a disconnected watcher starting from a recent known revision.
|
||||
// The etcd server may decide how often it will send notifications based on current load.
|
||||
bool progress_notify = 4;
|
||||
|
||||
enum FilterType {
|
||||
// filter out put event.
|
||||
NOPUT = 0;
|
||||
// filter out delete event.
|
||||
NODELETE = 1;
|
||||
}
|
||||
|
||||
// filters filter the events at server side before it sends back to the watcher.
|
||||
repeated FilterType filters = 5;
|
||||
|
||||
// If prev_kv is set, created watcher gets the previous KV before the event happens.
|
||||
// If the previous KV is already compacted, nothing will be returned.
|
||||
bool prev_kv = 6;
|
||||
|
||||
// If watch_id is provided and non-zero, it will be assigned to this watcher.
|
||||
// Since creating a watcher in etcd is not a synchronous operation,
|
||||
// this can be used ensure that ordering is correct when creating multiple
|
||||
// watchers on the same stream. Creating a watcher with an ID already in
|
||||
// use on the stream will cause an error to be returned.
|
||||
int64 watch_id = 7;
|
||||
|
||||
// fragment enables splitting large revisions into multiple watch responses.
|
||||
bool fragment = 8;
|
||||
}
|
||||
|
||||
message WatchCancelRequest {
|
||||
// watch_id is the watcher id to cancel so that no more events are transmitted.
|
||||
int64 watch_id = 1;
|
||||
}
|
||||
|
||||
// Requests the a watch stream progress status be sent in the watch response stream as soon as
|
||||
// possible.
|
||||
message WatchProgressRequest {
|
||||
}
|
||||
|
||||
message WatchResponse {
|
||||
ResponseHeader header = 1;
|
||||
// watch_id is the ID of the watcher that corresponds to the response.
|
||||
int64 watch_id = 2;
|
||||
|
||||
// created is set to true if the response is for a create watch request.
|
||||
// The client should record the watch_id and expect to receive events for
|
||||
// the created watcher from the same stream.
|
||||
// All events sent to the created watcher will attach with the same watch_id.
|
||||
bool created = 3;
|
||||
|
||||
// canceled is set to true if the response is for a cancel watch request.
|
||||
// No further events will be sent to the canceled watcher.
|
||||
bool canceled = 4;
|
||||
|
||||
// compact_revision is set to the minimum index if a watcher tries to watch
|
||||
// at a compacted index.
|
||||
//
|
||||
// This happens when creating a watcher at a compacted revision or the watcher cannot
|
||||
// catch up with the progress of the key-value store.
|
||||
//
|
||||
// The client should treat the watcher as canceled and should not try to create any
|
||||
// watcher with the same start_revision again.
|
||||
int64 compact_revision = 5;
|
||||
|
||||
// cancel_reason indicates the reason for canceling the watcher.
|
||||
string cancel_reason = 6;
|
||||
|
||||
// framgment is true if large watch response was split over multiple responses.
|
||||
bool fragment = 7;
|
||||
|
||||
repeated Event events = 11;
|
||||
}
|
||||
|
||||
message ResponseHeader {
|
||||
// cluster_id is the ID of the cluster which sent the response.
|
||||
uint64 cluster_id = 1;
|
||||
// member_id is the ID of the member which sent the response.
|
||||
uint64 member_id = 2;
|
||||
// revision is the key-value store revision when the request was applied.
|
||||
// For watch progress responses, the header.revision indicates progress. All future events
|
||||
// recieved in this stream are guaranteed to have a higher revision number than the
|
||||
// header.revision number.
|
||||
int64 revision = 3;
|
||||
// raft_term is the raft term when the request was applied.
|
||||
uint64 raft_term = 4;
|
||||
}
|
||||
|
||||
|
||||
message KeyValue {
|
||||
// key is the key in bytes. An empty key is not allowed.
|
||||
bytes key = 1;
|
||||
// create_revision is the revision of last creation on this key.
|
||||
int64 create_revision = 2;
|
||||
// mod_revision is the revision of last modification on this key.
|
||||
int64 mod_revision = 3;
|
||||
// version is the version of the key. A deletion resets
|
||||
// the version to zero and any modification of the key
|
||||
// increases its version.
|
||||
int64 version = 4;
|
||||
// value is the value held by the key, in bytes.
|
||||
bytes value = 5;
|
||||
// lease is the ID of the lease that attached to key.
|
||||
// When the attached lease expires, the key will be deleted.
|
||||
// If lease is 0, then no lease is attached to the key.
|
||||
int64 lease = 6;
|
||||
}
|
||||
|
||||
message Event {
|
||||
enum EventType {
|
||||
PUT = 0;
|
||||
DELETE = 1;
|
||||
}
|
||||
// type is the kind of event. If type is a PUT, it indicates
|
||||
// new data has been stored to the key. If type is a DELETE,
|
||||
// it indicates the key was deleted.
|
||||
EventType type = 1;
|
||||
// kv holds the KeyValue for the event.
|
||||
// A PUT event contains current kv pair.
|
||||
// A PUT event with kv.Version=1 indicates the creation of a key.
|
||||
// A DELETE/EXPIRE event contains the deleted key with
|
||||
// its modification revision set to the revision of deletion.
|
||||
KeyValue kv = 2;
|
||||
|
||||
// prev_kv holds the key-value pair before the event happens.
|
||||
KeyValue prev_kv = 3;
|
||||
}
|
|
@ -155,7 +155,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uin
|
|||
}
|
||||
}
|
||||
for (auto &stat : stats) {
|
||||
if (stat == pulsar::ResultOk) {
|
||||
if (stat != pulsar::ResultOk) {
|
||||
return Status(DB_ERROR, pulsar::strResult(stat));
|
||||
}
|
||||
}
|
||||
|
@ -181,7 +181,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
|
|||
}
|
||||
}
|
||||
for (auto &stat : stats) {
|
||||
if (stat == pulsar::ResultOk) {
|
||||
if (stat != pulsar::ResultOk) {
|
||||
return Status(DB_ERROR, pulsar::strResult(stat));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/third_party/protobuf/src)
|
||||
include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/include)
|
||||
|
||||
add_subdirectory( etcd_watcher )
|
||||
aux_source_directory( ./master master_src)
|
||||
add_library(meta ${master_src}
|
||||
./etcd_watcher/Watcher.cpp
|
||||
${PROJECT_SOURCE_DIR}/src/grpc/etcd.pb.cc
|
||||
${PROJECT_SOURCE_DIR}/src/grpc/etcd.grpc.pb.cc
|
||||
${PROJECT_SOURCE_DIR}/src/grpc/master.pb.cc
|
||||
${PROJECT_SOURCE_DIR}/src/grpc/master.grpc.pb.cc
|
||||
)
|
|
@ -0,0 +1,14 @@
|
|||
AUX_SOURCE_DIRECTORY(. watcher_src)
|
||||
add_executable(test_watcher
|
||||
${watcher_src}
|
||||
${PROJECT_SOURCE_DIR}/src/grpc/etcd.pb.cc
|
||||
${PROJECT_SOURCE_DIR}/src/grpc/etcd.grpc.pb.cc
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
test_watcher
|
||||
PRIVATE
|
||||
libprotobuf
|
||||
grpc++_reflection
|
||||
grpc++
|
||||
)
|
|
@ -0,0 +1,90 @@
|
|||
#include "Watcher.h"
|
||||
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include "grpc/etcd.grpc.pb.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace master {
|
||||
|
||||
Watcher::Watcher(const std::string &address,
|
||||
const std::string &key,
|
||||
std::function<void(etcdserverpb::WatchResponse)> callback,
|
||||
bool with_prefix) {
|
||||
auto channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials());
|
||||
stub_ = etcdserverpb::Watch::NewStub(channel);
|
||||
|
||||
call_ = std::make_unique<AsyncWatchAction>(key, with_prefix, stub_.get());
|
||||
work_thread_ = std::thread([&]() {
|
||||
call_->WaitForResponse(callback);
|
||||
});
|
||||
}
|
||||
|
||||
void Watcher::Cancel() {
|
||||
call_->CancelWatch();
|
||||
}
|
||||
|
||||
Watcher::~Watcher() {
|
||||
Cancel();
|
||||
work_thread_.join();
|
||||
}
|
||||
|
||||
AsyncWatchAction::AsyncWatchAction(const std::string &key, bool with_prefix, etcdserverpb::Watch::Stub *stub) {
|
||||
// tag `1` means to wire a rpc
|
||||
stream_ = stub->AsyncWatch(&context_, &cq_, (void *) 1);
|
||||
etcdserverpb::WatchRequest req;
|
||||
req.mutable_create_request()->set_key(key);
|
||||
if (with_prefix) {
|
||||
std::string range_end(key);
|
||||
int ascii = (int) range_end[range_end.length() - 1];
|
||||
range_end.back() = ascii + 1;
|
||||
req.mutable_create_request()->set_range_end(range_end);
|
||||
}
|
||||
void *got_tag;
|
||||
bool ok = false;
|
||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *) 1) {
|
||||
// tag `2` means write watch request to stream
|
||||
stream_->Write(req, (void *) 2);
|
||||
} else {
|
||||
throw std::runtime_error("failed to create a watch connection");
|
||||
}
|
||||
|
||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *) 2) {
|
||||
stream_->Read(&reply_, (void *) this);
|
||||
} else {
|
||||
throw std::runtime_error("failed to write WatchCreateRequest to server");
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncWatchAction::WaitForResponse(std::function<void(etcdserverpb::WatchResponse)> callback) {
|
||||
void *got_tag;
|
||||
bool ok = false;
|
||||
|
||||
while (cq_.Next(&got_tag, &ok)) {
|
||||
if (!ok) {
|
||||
break;
|
||||
}
|
||||
if (got_tag == (void *) 3) {
|
||||
cancled_.store(true);
|
||||
cq_.Shutdown();
|
||||
break;
|
||||
} else if (got_tag == (void *) this) // read tag
|
||||
{
|
||||
if (reply_.events_size()) {
|
||||
callback(reply_);
|
||||
}
|
||||
stream_->Read(&reply_, (void *) this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncWatchAction::CancelWatch() {
|
||||
if (!cancled_.load()) {
|
||||
// tag `3` mean write done
|
||||
stream_->WritesDone((void *) 3);
|
||||
cancled_.store(true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
#pragma once
|
||||
#include "grpc/etcd.grpc.pb.h"
|
||||
#include <grpc++/grpc++.h>
|
||||
#include <thread>
|
||||
|
||||
namespace milvus {
|
||||
namespace master {
|
||||
|
||||
class AsyncWatchAction;
|
||||
|
||||
class Watcher {
|
||||
public:
|
||||
Watcher(std::string const &address,
|
||||
std::string const &key,
|
||||
std::function<void(etcdserverpb::WatchResponse)> callback,
|
||||
bool with_prefix = true);
|
||||
void Cancel();
|
||||
~Watcher();
|
||||
|
||||
private:
|
||||
std::unique_ptr<etcdserverpb::Watch::Stub> stub_;
|
||||
std::unique_ptr<AsyncWatchAction> call_;
|
||||
std::thread work_thread_;
|
||||
};
|
||||
|
||||
class AsyncWatchAction {
|
||||
public:
|
||||
AsyncWatchAction(const std::string &key, bool with_prefix, etcdserverpb::Watch::Stub* stub);
|
||||
void WaitForResponse(std::function<void(etcdserverpb::WatchResponse)> callback);
|
||||
void CancelWatch();
|
||||
private:
|
||||
// Status status;
|
||||
grpc::ClientContext context_;
|
||||
grpc::CompletionQueue cq_;
|
||||
etcdserverpb::WatchResponse reply_;
|
||||
std::unique_ptr<grpc::ClientAsyncReaderWriter<etcdserverpb::WatchRequest, etcdserverpb::WatchResponse>> stream_;
|
||||
std::atomic<bool> cancled_ = false;
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
// Steps to test this file:
|
||||
// 1. start a etcdv3 server
|
||||
// 2. run this test
|
||||
// 3. modify test key using etcdctlv3 or etcd-clientv3(Must using v3 api)
|
||||
// TODO: move this test to unittest
|
||||
|
||||
#include "Watcher.h"
|
||||
|
||||
using namespace milvus::master;
|
||||
int main() {
|
||||
try {
|
||||
Watcher watcher("127.0.0.1:2379", "SomeKey", [](etcdserverpb::WatchResponse res) {
|
||||
std::cerr << "Key1 changed!" << std::endl;
|
||||
std::cout << "Event size: " << res.events_size() << std::endl;
|
||||
for (auto &event: res.events()) {
|
||||
std::cout <<
|
||||
event.kv().key() << ":" <<
|
||||
event.kv().value() << std::endl;
|
||||
}
|
||||
}, false);
|
||||
while (true) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(60000));
|
||||
watcher.Cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (const std::exception &e) {
|
||||
std::cout << e.what();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
#include "GrpcClient.h"
|
||||
#include "grpc++/grpc++.h"
|
||||
|
||||
using grpc::ClientContext;
|
||||
|
||||
namespace milvus {
|
||||
namespace master {
|
||||
GrpcClient::GrpcClient(const std::string &addr) {
|
||||
auto channel = ::grpc::CreateChannel(addr, ::grpc::InsecureChannelCredentials());
|
||||
stub_ = masterpb::Master::NewStub(channel);
|
||||
}
|
||||
|
||||
GrpcClient::GrpcClient(std::shared_ptr<::grpc::Channel> &channel)
|
||||
: stub_(masterpb::Master::NewStub(channel)) {
|
||||
}
|
||||
|
||||
Status GrpcClient::CreateCollection(const milvus::grpc::Mapping &mapping) {
|
||||
ClientContext context;
|
||||
::milvus::grpc::Status response;
|
||||
::grpc::Status grpc_status = stub_->CreateCollection(&context, mapping, &response);
|
||||
|
||||
if (!grpc_status.ok()) {
|
||||
std::cerr << "CreateHybridCollection gRPC failed!" << std::endl;
|
||||
return Status(grpc_status.error_code(), grpc_status.error_message());
|
||||
}
|
||||
|
||||
if (response.error_code() != grpc::SUCCESS) {
|
||||
// TODO: LOG
|
||||
return Status(response.error_code(), response.reason());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
#pragma once
|
||||
#include "grpc/master.grpc.pb.h"
|
||||
#include "grpc/message.pb.h"
|
||||
#include "grpc++/grpc++.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace master {
|
||||
class GrpcClient {
|
||||
public:
|
||||
explicit GrpcClient(const std::string& addr);
|
||||
explicit GrpcClient(std::shared_ptr<::grpc::Channel>& channel);
|
||||
~GrpcClient() = default;
|
||||
|
||||
public:
|
||||
Status
|
||||
CreateCollection(const milvus::grpc::Mapping& mapping);
|
||||
|
||||
private:
|
||||
std::unique_ptr<masterpb::Master::Stub> stub_;
|
||||
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -43,7 +43,7 @@ set( GRPC_SERVER_FILES ${GRPC_IMPL_FILES}
|
|||
|
||||
aux_source_directory( ${MILVUS_ENGINE_SRC}/server/context SERVER_CONTEXT_FILES )
|
||||
|
||||
add_library( server STATIC MessageWrapper.cpp MessageWrapper.h)
|
||||
add_library( server STATIC)
|
||||
target_sources( server
|
||||
PRIVATE ${GRPC_SERVER_FILES}
|
||||
${GRPC_SERVICE_FILES}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
#include "MetaWrapper.h"
|
||||
#include "config/ServerConfig.h"
|
||||
namespace milvus{
|
||||
namespace server {
|
||||
|
||||
MetaWrapper& MetaWrapper::GetInstance() {
|
||||
static MetaWrapper wrapper;
|
||||
return wrapper;
|
||||
}
|
||||
|
||||
Status MetaWrapper::Init() {
|
||||
auto addr = config.master.address() + ":" + std::to_string(config.master.port());
|
||||
client_ = std::make_shared<milvus::master::GrpcClient>(addr);
|
||||
}
|
||||
|
||||
std::shared_ptr<milvus::master::GrpcClient> MetaWrapper::MetaClient() {
|
||||
return client_;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
#include "utils/Status.h"
|
||||
#include "meta/master/GrpcClient.h"
|
||||
|
||||
namespace milvus{
|
||||
namespace server{
|
||||
|
||||
class MetaWrapper {
|
||||
public:
|
||||
static MetaWrapper&
|
||||
GetInstance();
|
||||
|
||||
Status
|
||||
Init();
|
||||
|
||||
std::shared_ptr<milvus::master::GrpcClient>
|
||||
MetaClient();
|
||||
|
||||
private:
|
||||
std::shared_ptr<milvus::master::GrpcClient> client_;
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@
|
|||
#include "utils/SignalHandler.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
#include "MessageWrapper.h"
|
||||
#include "MetaWrapper.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
|
@ -240,12 +241,15 @@ Server::StartService() {
|
|||
|
||||
grpc::GrpcServer::GetInstance().Start();
|
||||
|
||||
// Init pulsar message client
|
||||
stat = MessageWrapper::GetInstance().Init();
|
||||
if (!stat.ok()) {
|
||||
LOG_SERVER_ERROR_ << "Pulsar message client start service fail: " << stat.message();
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
MetaWrapper::GetInstance().Init();
|
||||
|
||||
return Status::OK();
|
||||
FAIL:
|
||||
std::cerr << "Milvus initializes fail: " << stat.message() << std::endl;
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "tracing/TextMapCarrier.h"
|
||||
#include "tracing/TracerUtil.h"
|
||||
#include "utils/Log.h"
|
||||
#include "server/MetaWrapper.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
|
@ -340,6 +341,10 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext *context, const ::mil
|
|||
CHECK_NULLPTR_RETURN(request);
|
||||
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
|
||||
|
||||
Status status = MetaWrapper::GetInstance().MetaClient()->CreateCollection(*request);
|
||||
|
||||
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
|
||||
SET_RESPONSE(response, status, context)
|
||||
return ::grpc::Status::OK;
|
||||
}
|
||||
|
||||
|
|
|
@ -66,6 +66,10 @@ add_custom_command(TARGET generate_suvlim_pb_grpc
|
|||
COMMAND echo "${PROTOC_EXCUTABLE}"
|
||||
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}"
|
||||
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_cpp.sh" -p "${PROTOC_EXCUTABLE}" -g "${GRPC_CPP_PLUGIN_EXCUTABLE}"
|
||||
COMMAND ${PROTOC_EXCUTABLE} -I "${PROTO_PATH}/proto" --grpc_out "${PROTO_PATH}" --cpp_out "${PROTO_PATH}"
|
||||
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}"
|
||||
"${PROTO_PATH}/proto/etcd.proto"
|
||||
DEPENDS "${PROTO_PATH}/proto/etcd.proto"
|
||||
)
|
||||
|
||||
set_property( GLOBAL PROPERTY PROTOC_EXCUTABLE ${PROTOC_EXCUTABLE})
|
||||
|
|
|
@ -87,12 +87,12 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
|
|||
}
|
||||
|
||||
return &QueryNode{
|
||||
QueryNodeId: queryNodeId,
|
||||
Collections: nil,
|
||||
SegmentsMap: segmentsMap,
|
||||
messageClient: mc,
|
||||
queryNodeTimeSync: queryNodeTimeSync,
|
||||
buffer: buffer,
|
||||
QueryNodeId: queryNodeId,
|
||||
Collections: nil,
|
||||
SegmentsMap: segmentsMap,
|
||||
messageClient: mc,
|
||||
queryNodeTimeSync: queryNodeTimeSync,
|
||||
buffer: buffer,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,9 +145,9 @@ func (node *QueryNode) PrepareBatchMsg() []int {
|
|||
return msgLen
|
||||
}
|
||||
|
||||
func (node *QueryNode) StartMessageClient() {
|
||||
func (node *QueryNode) StartMessageClient(pulsarURL string) {
|
||||
// TODO: add consumerMsgSchema
|
||||
node.messageClient.InitClient("pulsar://192.168.2.28:6650")
|
||||
node.messageClient.InitClient(pulsarURL)
|
||||
|
||||
go node.messageClient.ReceiveMessage()
|
||||
}
|
||||
|
@ -181,7 +181,7 @@ func (node *QueryNode) RunInsertDelete() {
|
|||
if count == 0 {
|
||||
start = time.Now()
|
||||
}
|
||||
count+=msgLen[0]
|
||||
count += msgLen[0]
|
||||
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
|
||||
//fmt.Println("MessagesPreprocess Done")
|
||||
node.WriterDelete()
|
||||
|
@ -191,7 +191,7 @@ func (node *QueryNode) RunInsertDelete() {
|
|||
//fmt.Println("DoInsertAndDelete Done")
|
||||
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
|
||||
//fmt.Print("UpdateSearchTimeSync Done\n\n\n")
|
||||
if count == 100000 - 1 {
|
||||
if count == 100000-1 {
|
||||
elapsed := time.Since(start)
|
||||
fmt.Println("Query node insert 10 × 10000 time:", elapsed)
|
||||
}
|
||||
|
@ -200,19 +200,23 @@ func (node *QueryNode) RunInsertDelete() {
|
|||
|
||||
func (node *QueryNode) RunSearch() {
|
||||
for {
|
||||
//time.Sleep(2 * 1000 * time.Millisecond)
|
||||
time.Sleep(0.2 * 1000 * time.Millisecond)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
if len(node.messageClient.GetSearchChan()) <= 0 {
|
||||
//fmt.Println("null Search")
|
||||
fmt.Println("null Search")
|
||||
continue
|
||||
}
|
||||
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
|
||||
msg := <-node.messageClient.GetSearchChan()
|
||||
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
|
||||
fmt.Println("Do Search...")
|
||||
node.Search(node.messageClient.SearchMsg)
|
||||
var status = node.Search(node.messageClient.SearchMsg)
|
||||
if status.ErrorCode != 0 {
|
||||
fmt.Println("Search Failed")
|
||||
node.PublishFailedSearchResult()
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
fmt.Println("Query node search time:", elapsed)
|
||||
|
@ -427,6 +431,8 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
|||
|
||||
var timestamp = msg.Timestamp
|
||||
var vector = msg.Records
|
||||
// We now only the first Json is valid.
|
||||
var queryJson = msg.Json[0]
|
||||
|
||||
// 1. Timestamp check
|
||||
// TODO: return or wait? Or adding graceful time
|
||||
|
@ -437,7 +443,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
|||
// 2. Do search in all segments
|
||||
for _, partition := range targetCollection.Partitions {
|
||||
for _, openSegment := range partition.OpenedSegments {
|
||||
var res, err = openSegment.SegmentSearch("", timestamp, vector)
|
||||
var res, err = openSegment.SegmentSearch(queryJson, timestamp, vector)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
return msgPb.Status{ErrorCode: 1}
|
||||
|
@ -448,7 +454,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
|||
}
|
||||
}
|
||||
for _, closedSegment := range partition.ClosedSegments {
|
||||
var res, err = closedSegment.SegmentSearch("", timestamp, vector)
|
||||
var res, err = closedSegment.SegmentSearch(queryJson, timestamp, vector)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
return msgPb.Status{ErrorCode: 1}
|
||||
|
@ -468,6 +474,9 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
|||
Ids: make([]int64, 0),
|
||||
}
|
||||
var results = msgPb.QueryResult{
|
||||
Status: &msgPb.Status{
|
||||
ErrorCode: 0,
|
||||
},
|
||||
Entities: &entities,
|
||||
Distances: make([]float32, 0),
|
||||
QueryId: msg.Uid,
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package reader
|
||||
|
||||
func startQueryNode() {
|
||||
func startQueryNode(pulsarURL string) {
|
||||
qn := NewQueryNode(0, 0)
|
||||
qn.InitQueryNodeCollection()
|
||||
//go qn.SegmentService()
|
||||
qn.StartMessageClient()
|
||||
qn.StartMessageClient(pulsarURL)
|
||||
|
||||
go qn.RunSearch()
|
||||
qn.RunInsertDelete()
|
||||
|
|
|
@ -5,5 +5,6 @@ import (
|
|||
)
|
||||
|
||||
func TestReader_startQueryNode(t *testing.T) {
|
||||
startQueryNode()
|
||||
pulsarURL := "pulsar://192.168.2.28:6650"
|
||||
startQueryNode(pulsarURL)
|
||||
}
|
||||
|
|
|
@ -10,8 +10,8 @@ import (
|
|||
type ResultEntityIds []int64
|
||||
|
||||
type SearchResult struct {
|
||||
ResultIds []int64
|
||||
ResultDistances []float32
|
||||
ResultIds []int64
|
||||
ResultDistances []float32
|
||||
}
|
||||
|
||||
func getResultTopicByClientId(clientId int64) string {
|
||||
|
@ -28,6 +28,20 @@ func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult, clientId
|
|||
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
|
||||
}
|
||||
|
||||
func (node *QueryNode) PublishFailedSearchResult() msgPb.Status {
|
||||
var results = msgPb.QueryResult{
|
||||
Status: &msgPb.Status{
|
||||
ErrorCode: 1,
|
||||
Reason: "Search Failed",
|
||||
},
|
||||
}
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
node.messageClient.Send(ctx, results)
|
||||
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
|
||||
}
|
||||
|
||||
func (node *QueryNode) PublicStatistic(statisticTopic string) msgPb.Status {
|
||||
// TODO: get statistic info
|
||||
// getStatisticInfo()
|
||||
|
|
|
@ -164,12 +164,14 @@ func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorRecord *schema.VectorRowRecord) (*SearchResult, error) {
|
||||
func (s *Segment) SegmentSearch(queryJson string, timestamp uint64, vectorRecord *schema.VectorRowRecord) (*SearchResult, error) {
|
||||
/*C.Search
|
||||
int
|
||||
Search(CSegmentBase c_segment,
|
||||
void* fake_query,
|
||||
const char* query_json,
|
||||
unsigned long timestamp,
|
||||
float* query_raw_data,
|
||||
int num_of_query_raw_data,
|
||||
long int* result_ids,
|
||||
float* result_distances);
|
||||
*/
|
||||
|
@ -179,12 +181,23 @@ func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorReco
|
|||
resultIds := make([]int64, TopK)
|
||||
resultDistances := make([]float32, TopK)
|
||||
|
||||
var cQueryPtr = unsafe.Pointer(nil)
|
||||
var cQueryJson = C.CString(queryJson)
|
||||
var cTimestamp = C.ulong(timestamp)
|
||||
var cResultIds = (*C.long)(&resultIds[0])
|
||||
var cResultDistances = (*C.float)(&resultDistances[0])
|
||||
var cQueryRawData *C.float
|
||||
var cQueryRawDataLength C.int
|
||||
|
||||
var status = C.Search(s.SegmentPtr, cQueryPtr, cTimestamp, cResultIds, cResultDistances)
|
||||
if vectorRecord.BinaryData != nil {
|
||||
return nil, errors.New("Data of binary type is not supported yet")
|
||||
} else if len(vectorRecord.FloatData) <= 0 {
|
||||
return nil, errors.New("Null query vector data")
|
||||
} else {
|
||||
cQueryRawData = (*C.float)(&vectorRecord.FloatData[0])
|
||||
cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData))
|
||||
}
|
||||
|
||||
var status = C.Search(s.SegmentPtr, cQueryJson, cTimestamp, cQueryRawData, cQueryRawDataLength, cResultIds, cResultDistances)
|
||||
|
||||
if status != 0 {
|
||||
return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status)))
|
||||
|
|
|
@ -3,6 +3,7 @@ package reader
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
schema "github.com/czs007/suvlim/pkg/master/grpc/message"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"math"
|
||||
"testing"
|
||||
|
@ -131,7 +132,15 @@ func TestSegment_SegmentSearch(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
// 6. Do search
|
||||
var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil)
|
||||
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
|
||||
var queryRawData = make([]float32, 0)
|
||||
for i := 0; i < 16; i ++ {
|
||||
queryRawData = append(queryRawData, float32(i))
|
||||
}
|
||||
var vectorRecord = schema.VectorRowRecord {
|
||||
FloatData: queryRawData,
|
||||
}
|
||||
var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[0], &vectorRecord)
|
||||
assert.NoError(t, searchErr)
|
||||
fmt.Println(searchRes)
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ GRPC_INCLUDE=.:.
|
|||
rm -rf proto-cpp && mkdir -p proto-cpp
|
||||
|
||||
PB_FILES=()
|
||||
GRPC_FILES=("message.proto")
|
||||
GRPC_FILES=("message.proto" "master.proto")
|
||||
|
||||
ALL_FILES=("${PB_FILES[@]}")
|
||||
ALL_FILES+=("${GRPC_FILES[@]}")
|
||||
|
|
Loading…
Reference in New Issue