Add time sync for query node and write node

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2020-09-15 17:41:05 +08:00 committed by yefu.chen
parent 71950d44a8
commit 917dc677af
57 changed files with 10873 additions and 653 deletions

View File

@ -60,7 +60,7 @@ func init() {
func load_config() {
//var config ServerConfig
filename := "conf/config.yaml"
filename := "../conf/config.yaml"
source, err := ioutil.ReadFile(filename)
if err != nil {
panic(err)

View File

@ -201,3 +201,12 @@ if ( NOT MILVUS_DB_PATH )
endif ()
set( GPU_ENABLE "false" )
install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/dog_segment/
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/include
FILES_MATCHING PATTERN "*_c.h"
)
install(FILES ${CMAKE_BINARY_DIR}/src/dog_segment/libmilvus_dog_segment.so
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/lib)

View File

@ -40,8 +40,10 @@ PreDelete(CSegmentBase c_segment, long int size);
int
Search(CSegmentBase c_segment,
void* fake_query,
const char* query_json,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances);
@ -50,6 +52,9 @@ Search(CSegmentBase c_segment,
int
Close(CSegmentBase c_segment);
int
BuildIndex(CSegmentBase c_segment);
bool
IsOpened(CSegmentBase c_segment);

View File

@ -0,0 +1,59 @@
#pragma once
#include "AckResponder.h"
#include "SegmentDefs.h"
#include "knowhere//index/vector_index/IndexIVF.h"
#include <memory>
namespace milvus::dog_segment {
struct DeletedRecord {
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
faiss::ConcurrentBitsetPtr bitmap_ptr;
std::shared_ptr<TmpBitmap> clone(int64_t capacity);
};
DeletedRecord() : lru_(std::make_shared<TmpBitmap>()) {
lru_->bitmap_ptr = std::make_shared<faiss::ConcurrentBitset>(0);
}
auto get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
}
void insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
std::lock_guard lck(shared_mutex_);
if (new_entry->del_barrier <= lru_->del_barrier) {
if (!force || new_entry->bitmap_ptr->capacity() <= lru_->bitmap_ptr->capacity()) {
// DO NOTHING
return;
}
}
lru_ = std::move(new_entry);
}
public:
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;
ConcurrentVector<Timestamp, true> timestamps_;
ConcurrentVector<idx_t, true> uids_;
private:
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
};
auto DeletedRecord::TmpBitmap::clone(int64_t capacity) -> std::shared_ptr<TmpBitmap> {
auto res = std::make_shared<TmpBitmap>();
res->del_barrier = this->del_barrier;
res->bitmap_ptr = std::make_shared<faiss::ConcurrentBitset>(capacity);
auto u8size = this->bitmap_ptr->u8size();
memcpy(res->bitmap_ptr->mutable_data(), res->bitmap_ptr->data(), u8size);
return res;
}
}

View File

@ -164,7 +164,9 @@ class Schema {
const FieldMeta&
operator[](const std::string& field_name) const {
auto offset_iter = offsets_.find(field_name);
assert(offset_iter != offsets_.end());
if (offset_iter == offsets_.end()) {
throw std::runtime_error("Cannot found field_name: " + field_name);
}
auto offset = offset_iter->second;
return (*this)[offset];
}
@ -180,5 +182,6 @@ class Schema {
};
using SchemaPtr = std::shared_ptr<Schema>;
using idx_t = int64_t;
} // namespace milvus::dog_segment

View File

@ -17,7 +17,6 @@ TestABI() {
std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema, IndexMetaPtr remote_index_meta) {
if (remote_index_meta == nullptr) {
auto index_meta = std::make_shared<IndexMeta>(schema);
auto dim = schema->operator[]("fakevec").get_dim();
@ -65,14 +64,20 @@ SegmentNaive::PreDelete(int64_t size) {
}
auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp,
int64_t insert_barrier) -> std::shared_ptr<DeletedRecord::TmpBitmap> {
int64_t insert_barrier, bool force) -> std::shared_ptr<DeletedRecord::TmpBitmap> {
auto old = deleted_record_.get_lru_entry();
if (old->del_barrier == del_barrier) {
return old;
}
auto current = std::make_shared<DeletedRecord::TmpBitmap>(*old);
auto &vec = current->bitmap;
if(!force || old->bitmap_ptr->capacity() == insert_barrier) {
if (old->del_barrier == del_barrier) {
return old;
}
}
auto current = old->clone(insert_barrier);
current->del_barrier = del_barrier;
auto bitmap = current->bitmap_ptr;
if (del_barrier < old->del_barrier) {
for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
// get uid in delete logs
@ -84,7 +89,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
for (auto iter = iter_b; iter != iter_e; ++iter) {
auto offset = iter->second;
if (record_.timestamps_[offset] < query_timestamp) {
assert(offset < vec.size());
assert(offset < insert_barrier);
the_offset = std::max(the_offset, offset);
}
}
@ -93,11 +98,10 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
continue;
}
// otherwise, clear the flag
vec[the_offset] = false;
bitmap->clear(the_offset);
}
return current;
} else {
vec.resize(insert_barrier);
for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
// get uid in delete logs
auto uid = deleted_record_.uids_[del_index];
@ -110,11 +114,11 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
if (offset >= insert_barrier) {
continue;
}
if (offset >= vec.size()) {
if (offset >= insert_barrier) {
continue;
}
if (record_.timestamps_[offset] < query_timestamp) {
assert(offset < vec.size());
assert(offset < insert_barrier);
the_offset = std::max(the_offset, offset);
}
}
@ -125,7 +129,7 @@ auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_times
}
// otherwise, set the flag
vec[the_offset] = true;
bitmap->set(the_offset);
}
this->deleted_record_.insert_lru_entry(current);
}
@ -250,15 +254,6 @@ SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t *uids_r
// return Status::OK();
}
// TODO: remove mock
Status
SegmentNaive::QueryImpl(const query::QueryPtr &query, Timestamp timestamp, QueryResult &result) {
// assert(query);
throw std::runtime_error("unimplemnted");
}
template<typename RecordType>
int64_t get_barrier(const RecordType &record, Timestamp timestamp) {
auto &vec = record.timestamps_;
@ -275,11 +270,70 @@ int64_t get_barrier(const RecordType &record, Timestamp timestamp) {
return beg;
}
Status
SegmentNaive::QueryImpl(query::QueryPtr query_info, Timestamp timestamp, QueryResult &result) {
auto ins_barrier = get_barrier(record_, timestamp);
auto del_barrier = get_barrier(deleted_record_, timestamp);
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier, true);
assert(bitmap_holder);
assert(bitmap_holder->bitmap_ptr->capacity() == ins_barrier);
auto field_offset = schema_->get_offset(query_info->field_name);
auto &field = schema_->operator[](query_info->field_name);
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto bitmap = bitmap_holder->bitmap_ptr;
auto topK = query_info->topK;
auto num_queries = query_info->num_queries;
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_[0]);
auto index_entry = index_meta_->lookup_by_field(query_info->field_name);
auto conf = index_entry.config;
conf[milvus::knowhere::meta::TOPK] = query_info->topK;
{
auto count = 0;
for(int i = 0; i < bitmap->capacity(); ++i) {
if(bitmap->test(i)) {
++count;
}
}
std::cout << "fuck " << count << std::endl;
}
auto indexing = std::static_pointer_cast<knowhere::VecIndex>(indexings_[index_entry.index_name]);
indexing->SetBlacklist(bitmap);
auto ds = knowhere::GenDataset(query_info->num_queries, dim, query_info->query_raw_data.data());
auto final = indexing->Query(ds, conf);
auto ids = final->Get<idx_t*>(knowhere::meta::IDS);
auto distances = final->Get<float*>(knowhere::meta::DISTANCE);
auto total_num = num_queries * topK;
result.result_ids_.resize(total_num);
result.result_distances_.resize(total_num);
result.row_num_ = total_num;
result.num_queries_ = num_queries;
result.topK_ = topK;
std::copy_n(ids, total_num, result.result_ids_.data());
std::copy_n(distances, total_num, result.result_distances_.data());
for(auto& id: result.result_ids_) {
id = record_.uids_[id];
}
return Status::OK();
}
Status
SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult &result) {
// TODO: enable delete
// TODO: enable index
// TODO: remove mock
if (query_info == nullptr) {
query_info = std::make_shared<query::Query>();
query_info->field_name = "fakevec";
@ -295,27 +349,22 @@ SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult
}
}
if(index_ready_) {
return QueryImpl(query_info, timestamp, result);
}
auto ins_barrier = get_barrier(record_, timestamp);
auto del_barrier = get_barrier(deleted_record_, timestamp);
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
assert(bitmap_holder);
auto &field = schema_->operator[](query_info->field_name);
assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto bitmap = bitmap_holder->bitmap_ptr;
auto topK = query_info->topK;
auto num_queries = query_info->num_queries;
auto barrier = get_barrier(record_, timestamp);
auto del_barrier = get_barrier(deleted_record_, timestamp);
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, barrier);
if (!bitmap_holder) {
throw std::runtime_error("fuck");
}
auto bitmap = &bitmap_holder->bitmap;
if (topK > barrier) {
topK = barrier;
}
auto get_L2_distance = [dim](const float *a, const float *b) {
float L2_distance = 0;
for (auto i = 0; i < dim; ++i) {
@ -325,11 +374,12 @@ SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult
return L2_distance;
};
std::vector<std::priority_queue<std::pair<float, int>>> records(num_queries);
// TODO: optimize
std::vector<std::priority_queue<std::pair<float, int>>> records(num_queries);
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_[0]);
for (int64_t i = 0; i < barrier; ++i) {
if (i < bitmap->size() && bitmap->at(i)) {
for (int64_t i = 0; i < ins_barrier; ++i) {
if (i < bitmap->capacity() && bitmap->test(i)) {
continue;
}
auto element = vec_ptr->get_element(i);
@ -406,17 +456,16 @@ knowhere::IndexPtr SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry &entry
std::vector<knowhere::DatasetPtr> datasets;
for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) {
auto &uids_chunk = uids.get_chunk(chunk_id);
auto &entities_chunk = entities->get_chunk(chunk_id);
auto entities_chunk = entities->get_chunk(chunk_id).data();
int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
: DefaultElementPerChunk;
datasets.push_back(knowhere::GenDatasetWithIds(count, dim, entities_chunk.data(), uids_chunk.data()));
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
}
for (auto &ds: datasets) {
indexing->Train(ds, entry.config);
}
for (auto &ds: datasets) {
indexing->Add(ds, entry.config);
indexing->AddWithoutIds(ds, entry.config);
}
return indexing;
}
@ -435,6 +484,7 @@ SegmentNaive::BuildIndex() {
throw std::runtime_error("unimplemented");
}
}
index_ready_ = true;
return Status::OK();
}

View File

@ -1,4 +1,5 @@
#pragma once
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_unordered_map.h>
#include <tbb/concurrent_vector.h>
@ -12,18 +13,19 @@
// #include "knowhere/index/structured_index/StructuredIndex.h"
#include "query/GeneralQuery.h"
#include "utils/Status.h"
using idx_t = int64_t;
#include "dog_segment/DeletedRecord.h"
namespace milvus::dog_segment {
struct ColumnBasedDataChunk {
std::vector<std::vector<float>> entity_vecs;
static ColumnBasedDataChunk
from(const DogDataChunk& source, const Schema& schema) {
from(const DogDataChunk &source, const Schema &schema) {
ColumnBasedDataChunk dest;
auto count = source.count;
auto raw_data = reinterpret_cast<const char*>(source.raw_data);
auto raw_data = reinterpret_cast<const char *>(source.raw_data);
auto align = source.sizeof_per_row;
for (auto& field : schema) {
for (auto &field : schema) {
auto len = field.get_sizeof();
assert(len % sizeof(float) == 0);
std::vector<float> new_col(len * count / sizeof(float));
@ -39,7 +41,7 @@ struct ColumnBasedDataChunk {
};
class SegmentNaive : public SegmentBase {
public:
public:
virtual ~SegmentNaive() = default;
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
@ -49,17 +51,18 @@ class SegmentNaive : public SegmentBase {
// TODO: originally, id should be put into data_chunk
// TODO: Is it ok to put them the other side?
Status
Insert(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) override;
Insert(int64_t reserverd_offset, int64_t size, const int64_t *primary_keys, const Timestamp *timestamps,
const DogDataChunk &values) override;
int64_t PreDelete(int64_t size) override;
// TODO: add id into delete log, possibly bitmap
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) override;
Delete(int64_t reserverd_offset, int64_t size, const int64_t *primary_keys, const Timestamp *timestamps) override;
// query contains metadata of
Status
Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult& results) override;
Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult &results) override;
// stop receive insert requests
// will move data to immutable vector or something
@ -83,7 +86,7 @@ class SegmentNaive : public SegmentBase {
}
Status
LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) override {
LoadRawData(std::string_view field_name, const char *blob, int64_t blob_size) override {
// TODO: NO-OP
return Status::OK();
}
@ -93,10 +96,12 @@ public:
get_row_count() const override {
return record_.ack_responder_.GetAck();
}
SegmentState
get_state() const override {
return state_.load(std::memory_order_relaxed);
}
ssize_t
get_deleted_count() const override {
return 0;
@ -105,15 +110,17 @@ public:
public:
friend std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta);
explicit SegmentNaive(SchemaPtr schema, IndexMetaPtr index_meta)
: schema_(schema), index_meta_(index_meta), record_(*schema) {
}
private:
private:
struct MutableRecord {
ConcurrentVector<uint64_t> uids_;
tbb::concurrent_vector<Timestamp> timestamps_;
std::vector<tbb::concurrent_vector<float>> entity_vecs_;
MutableRecord(int entity_size) : entity_vecs_(entity_size) {
}
};
@ -124,58 +131,34 @@ public:
ConcurrentVector<Timestamp, true> timestamps_;
ConcurrentVector<idx_t, true> uids_;
std::vector<std::shared_ptr<VectorBase>> entity_vec_;
Record(const Schema& schema);
Record(const Schema &schema);
template<typename Type>
auto get_vec_entity(int offset) {
return std::static_pointer_cast<ConcurrentVector<Type>>(entity_vec_[offset]);
}
};
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
struct DeletedRecord {
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;
ConcurrentVector<Timestamp, true> timestamps_;
ConcurrentVector<idx_t, true> uids_;
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
std::vector<bool> bitmap;
};
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
DeletedRecord(): lru_(std::make_shared<TmpBitmap>()) {}
auto get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
}
void insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry) {
std::lock_guard lck(shared_mutex_);
if(new_entry->del_barrier <= lru_->del_barrier) {
// DO NOTHING
return;
}
lru_ = std::move(new_entry);
}
};
std::shared_ptr<DeletedRecord::TmpBitmap> get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier);
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false);
Status
QueryImpl(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results);
QueryImpl(query::QueryPtr query, Timestamp timestamp, QueryResult &results);
template<typename Type>
knowhere::IndexPtr BuildVecIndexImpl(const IndexMeta::Entry& entry);
knowhere::IndexPtr BuildVecIndexImpl(const IndexMeta::Entry &entry);
private:
private:
SchemaPtr schema_;
std::atomic<SegmentState> state_ = SegmentState::Open;
Record record_;
DeletedRecord deleted_record_;
std::atomic<bool> index_ready_ = false;
IndexMetaPtr index_meta_;
std::unordered_map<std::string, knowhere::IndexPtr> indexings_; // index_name => indexing
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
};
} // namespace milvus::dog_segment

View File

@ -91,14 +91,29 @@ PreDelete(CSegmentBase c_segment, long int size) {
int
Search(CSegmentBase c_segment,
void* fake_query,
const char* query_json,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
milvus::dog_segment::QueryResult query_result;
auto res = segment->Query(nullptr, timestamp, query_result);
// parse query param json
auto query_param_json_string = std::string(query_json);
auto query_param_json = nlohmann::json::parse(query_param_json_string);
// construct QueryPtr
auto query_ptr = std::make_shared<milvus::query::Query>();
query_ptr->num_queries = query_param_json["num_queries"];
query_ptr->topK = query_param_json["topK"];
query_ptr->field_name = query_param_json["field_name"];
query_ptr->query_raw_data.resize(num_of_query_raw_data);
memcpy(query_ptr->query_raw_data.data(), query_raw_data, num_of_query_raw_data * sizeof(float));
auto res = segment->Query(query_ptr, timestamp, query_result);
// result_ids and result_distances have been allocated memory in goLang,
// so we don't need to malloc here.

View File

@ -40,8 +40,10 @@ PreDelete(CSegmentBase c_segment, long int size);
int
Search(CSegmentBase c_segment,
void* fake_query,
const char* query_json,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances);

View File

@ -108,7 +108,7 @@ class VecIndex : public Index {
size_t
BlacklistSize() {
if (bitset_) {
return bitset_->size() * sizeof(uint8_t);
return bitset_->u8size() * sizeof(uint8_t);
} else {
return 0;
}

View File

@ -197,7 +197,7 @@ ConcurrentBitset::capacity() {
}
size_t
ConcurrentBitset::size() {
ConcurrentBitset::u8size() {
return ((capacity_ + 8 - 1) >> 3);
}

View File

@ -63,15 +63,15 @@ class ConcurrentBitset {
size_t
capacity();
size_t
size();
const uint8_t*
data();
uint8_t*
mutable_data();
size_t
u8size();
private:
size_t capacity_;
std::vector<std::atomic<uint8_t>> bitset_;

View File

@ -7,178 +7,182 @@
#include "dog_segment/segment_c.h"
TEST(CApiTest, CollectionTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
DeleteCollection(collection);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
DeleteCollection(collection);
}
TEST(CApiTest, PartitonTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
DeleteCollection(collection);
DeletePartition(partition);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
DeleteCollection(collection);
DeletePartition(partition);
}
TEST(CApiTest, SegmentTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, InsertTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for(int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for(auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for (int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for (auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
}
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
}
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
assert(res == 0);
assert(res == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, DeleteTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
auto offset = PreDelete(segment, 3);
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, SearchTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for(int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for(auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for (int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for (auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
}
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
}
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto offset = PreInsert(segment, N);
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
assert(ins_res == 0);
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
assert(ins_res == 0);
long result_ids[10];
float result_distances[10];
auto sea_res = Search(segment, nullptr, 1, result_ids, result_distances);
assert(sea_res == 0);
long result_ids[10];
float result_distances[10];
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})");
std::vector<float> query_raw_data(16);
for (int i = 0; i < 16; i++) {
query_raw_data[i] = e() % 2000 * 0.001 - 1.0;
}
auto sea_res = Search(segment, query_json.data(), 1, query_raw_data.data(), 16, result_ids, result_distances);
assert(sea_res == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, IsOpenedTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto is_opened = IsOpened(segment);
assert(is_opened);
auto is_opened = IsOpened(segment);
assert(is_opened);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, CloseTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto status = Close(segment);
assert(status == 0);
auto status = Close(segment);
assert(status == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
@ -190,17 +194,17 @@ auto generate_data(int N) {
std::default_random_engine er(42);
std::uniform_real_distribution<> distribution(0.0, 1.0);
std::default_random_engine ei(42);
for(int i = 0; i < N; ++i) {
for (int i = 0; i < N; ++i) {
uids.push_back(10 * N + i);
timestamps.push_back(0);
// append vec
float vec[16];
for(auto &x: vec) {
for (auto &x: vec) {
x = distribution(er);
}
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
int age = ei() % 100;
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
}
return std::make_tuple(raw_data, timestamps, uids);
}
@ -217,10 +221,10 @@ TEST(CApiTest, TestQuery) {
int N = 1000 * 1000;
auto [raw_data, timestamps, uids] = generate_data(N);
auto[raw_data, timestamps, uids] = generate_data(N);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
assert(res == 0);
auto row_count = GetRowCount(segment);
@ -228,15 +232,18 @@ TEST(CApiTest, TestQuery) {
std::vector<long> result_ids(10);
std::vector<float> result_distances(10);
auto sea_res = Search(segment, nullptr, 1, result_ids.data(), result_distances.data());
auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})");
auto sea_res = Search(segment, query_json.data(), 1, (float *) raw_data.data(), 16, result_ids.data(),
result_distances.data());
ASSERT_EQ(sea_res, 0);
ASSERT_EQ(result_ids[0], 10 * N);
ASSERT_EQ(result_distances[0], 0);
std::vector<uint64_t> del_ts(N/2, 100);
auto pre_off = PreDelete(segment, N / 2);
Delete(segment, pre_off, N / 2, uids.data(), del_ts.data());
auto N_del = N / 2;
std::vector<uint64_t> del_ts(N_del, 100);
auto pre_off = PreDelete(segment, N_del);
Delete(segment, pre_off, N_del, uids.data(), del_ts.data());
Close(segment);
BuildIndex(segment);
@ -244,25 +251,37 @@ TEST(CApiTest, TestQuery) {
std::vector<long> result_ids2(10);
std::vector<float> result_distances2(10);
sea_res = Search(segment, nullptr, 104, result_ids2.data(), result_distances2.data());
for(auto x: result_ids2) {
ASSERT_GE(x, 10 * N + N / 2);
sea_res = Search(segment, query_json.data(), 104, (float *) raw_data.data(), 16, result_ids2.data(),
result_distances2.data());
// sea_res = Search(segment, nullptr, 104, result_ids2.data(), result_distances2.data());
std::cout << "case 1" << std::endl;
for (int i = 0; i < 10; ++i) {
std::cout << result_ids[i] << "->" << result_distances[i] << std::endl;
}
std::cout << "case 2" << std::endl;
for (int i = 0; i < 10; ++i) {
std::cout << result_ids2[i] << "->" << result_distances2[i] << std::endl;
}
for (auto x: result_ids2) {
ASSERT_GE(x, 10 * N + N_del);
ASSERT_LT(x, 10 * N + N);
}
auto iter = 0;
for(int i = 0; i < result_ids.size(); ++i) {
auto uid = result_ids[i];
auto dis = result_distances[i];
if(uid >= 10 * N + N / 2) {
auto uid2 = result_ids2[iter];
auto dis2 = result_distances2[iter];
ASSERT_EQ(uid, uid2);
ASSERT_EQ(dis, dis2);
++iter;
}
}
// auto iter = 0;
// for(int i = 0; i < result_ids.size(); ++i) {
// auto uid = result_ids[i];
// auto dis = result_distances[i];
// if(uid >= 10 * N + N_del) {
// auto uid2 = result_ids2[iter];
// auto dis2 = result_distances2[iter];
// ASSERT_EQ(uid, uid2);
// ASSERT_EQ(dis, dis2);
// ++iter;
// }
// }
DeleteCollection(collection);
@ -271,28 +290,28 @@ TEST(CApiTest, TestQuery) {
}
TEST(CApiTest, GetDeletedCountTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
long delete_primary_keys[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
auto offset = PreDelete(segment, 3);
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
auto del_res = Delete(segment, offset, 3, delete_primary_keys, delete_timestamps);
assert(del_res == 0);
// TODO: assert(deleted_count == len(delete_primary_keys))
auto deleted_count = GetDeletedCount(segment);
assert(deleted_count == 0);
// TODO: assert(deleted_count == len(delete_primary_keys))
auto deleted_count = GetDeletedCount(segment);
assert(deleted_count == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
@ -306,10 +325,10 @@ TEST(CApiTest, GetRowCountTest) {
int N = 10000;
auto [raw_data, timestamps, uids] = generate_data(N);
auto[raw_data, timestamps, uids] = generate_data(N);
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int)line_sizeof, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
assert(res == 0);
auto row_count = GetRowCount(segment);

View File

@ -26,29 +26,30 @@ using std::vector;
using namespace milvus;
namespace {
template<int DIM>
auto generate_data(int N) {
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
std::default_random_engine er(42);
std::uniform_real_distribution<> distribution(0.0, 1.0);
std::default_random_engine ei(42);
for (int i = 0; i < N; ++i) {
uids.push_back(10 * N + i);
timestamps.push_back(0);
// append vec
float vec[DIM];
for (auto &x: vec) {
x = distribution(er);
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
template<int DIM>
auto generate_data(int N) {
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
std::default_random_engine er(42);
std::uniform_real_distribution<> distribution(0.0, 1.0);
std::default_random_engine ei(42);
for (int i = 0; i < N; ++i) {
uids.push_back(10 * N + i);
timestamps.push_back(0);
// append vec
float vec[DIM];
for (auto &x: vec) {
x = distribution(er);
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
// int age = ei() % 100;
// raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
}
return std::make_tuple(raw_data, timestamps, uids);
}
return std::make_tuple(raw_data, timestamps, uids);
}
}
TEST(TestIndex, Naive) {
constexpr int N = 100000;
@ -69,25 +70,59 @@ TEST(TestIndex, Naive) {
{milvus::knowhere::meta::DEVICEID, 0},
};
auto ds = knowhere::GenDatasetWithIds(N / 2, DIM, raw_data.data(), uids.data());
auto ds2 = knowhere::GenDatasetWithIds(N / 2, DIM, raw_data.data() + sizeof(float[DIM]) * N / 2, uids.data() + N / 2);
// auto ds = knowhere::GenDataset(N, DIM, raw_data.data());
// auto ds2 = knowhere::GenDatasetWithIds(N / 2, DIM, raw_data.data() + sizeof(float[DIM]) * N / 2, uids.data() + N / 2);
// NOTE: you must train first and then add
index->Train(ds, conf);
index->Train(ds2, conf);
index->Add(ds, conf);
index->Add(ds2, conf);
// index->Train(ds, conf);
// index->Train(ds2, conf);
// index->AddWithoutIds(ds, conf);
// index->Add(ds2, conf);
auto query_ds = knowhere::GenDataset(1, DIM, raw_data.data());
auto final = index->Query(query_ds, conf);
auto mmm = final->data();
cout << endl;
for(auto [k, v]: mmm) {
cout << k << endl;
std::vector<knowhere::DatasetPtr> datasets;
std::vector<std::vector<float>> ftrashs;
for (int beg = 0; beg < N; beg += N) {
auto end = beg + N;
if (end > N) {
end = N;
}
std::vector<float> ft(raw_data.data() + DIM * beg, raw_data.data() + DIM * end);
auto ds = knowhere::GenDataset(end - beg, DIM, ft.data());
datasets.push_back(ds);
ftrashs.push_back(std::move(ft));
// // NOTE: you must train first and then add
// index->Train(ds, conf);
// index->Add(ds, conf);
}
auto ids = final->Get<idx_t*>(knowhere::meta::IDS);
auto distances = final->Get<float*>(knowhere::meta::DISTANCE);
for(int i = 0; i < TOPK; ++i) {
for (auto &ds: datasets) {
index->Train(ds, conf);
}
for (auto &ds: datasets) {
index->AddWithoutIds(ds, conf);
}
auto bitmap = std::make_shared<faiss::ConcurrentBitset>(N);
// exclude the first
for (int i = 0; i < N / 2; ++i) {
bitmap->set(i);
}
index->SetBlacklist(bitmap);
auto query_ds = knowhere::GenDataset(1, DIM, raw_data.data());
auto final = index->Query(query_ds, conf);
auto ids = final->Get<idx_t *>(knowhere::meta::IDS);
auto distances = final->Get<float *>(knowhere::meta::DISTANCE);
for (int i = 0; i < TOPK; ++i) {
if (ids[i] < N / 2) {
cout << "WRONG: ";
}
cout << ids[i] << "->" << distances[i] << endl;
}
int i = 1+1;
int i = 1 + 1;
}

View File

@ -25,13 +25,15 @@ add_subdirectory( db ) # target milvus_engine
add_subdirectory( log )
add_subdirectory( server )
add_subdirectory( message_client )
add_subdirectory( meta )
set(link_lib
milvus_engine
config
query
query
utils
log
meta
)

View File

@ -85,6 +85,11 @@ ConfigMgr::ConfigMgr() {
"localhost", nullptr, nullptr)},
{"pulsar.port", CreateIntegerConfig("pulsar.port", false, 0, 65535, &config.pulsar.port.value,
6650, nullptr, nullptr)},
/* master */
{"master.address", CreateStringConfig("master.address", false, &config.master.address.value,
"localhost", nullptr, nullptr)},
{"master.port", CreateIntegerConfig("master.port", false, 0, 65535, &config.master.port.value,
6000, nullptr, nullptr)},
/* log */

View File

@ -76,6 +76,11 @@ struct ServerConfig {
Integer port{6650};
}pulsar;
struct Master{
String address{"localhost"};
Integer port{6000};
}master;
struct Engine {
Integer build_index_threshold{4096};

View File

@ -0,0 +1,70 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: etcd.proto
#include "etcd.pb.h"
#include "etcd.grpc.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/client_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/rpc_service_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace etcdserverpb {
static const char* Watch_method_names[] = {
"/etcdserverpb.Watch/Watch",
};
std::unique_ptr< Watch::Stub> Watch::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
(void)options;
std::unique_ptr< Watch::Stub> stub(new Watch::Stub(channel));
return stub;
}
Watch::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
: channel_(channel), rpcmethod_Watch_(Watch_method_names[0], ::grpc::internal::RpcMethod::BIDI_STREAMING, channel)
{}
::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::WatchRaw(::grpc::ClientContext* context) {
return ::grpc_impl::internal::ClientReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), rpcmethod_Watch_, context);
}
void Watch::Stub::experimental_async::Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) {
::grpc_impl::internal::ClientCallbackReaderWriterFactory< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>::Create(stub_->channel_.get(), stub_->rpcmethod_Watch_, context, reactor);
}
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return ::grpc_impl::internal::ClientAsyncReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), cq, rpcmethod_Watch_, context, true, tag);
}
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch::Stub::PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncReaderWriterFactory< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>::Create(channel_.get(), cq, rpcmethod_Watch_, context, false, nullptr);
}
Watch::Service::Service() {
AddMethod(new ::grpc::internal::RpcServiceMethod(
Watch_method_names[0],
::grpc::internal::RpcMethod::BIDI_STREAMING,
new ::grpc::internal::BidiStreamingHandler< Watch::Service, ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>(
std::mem_fn(&Watch::Service::Watch), this)));
}
Watch::Service::~Service() {
}
::grpc::Status Watch::Service::Watch(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream) {
(void) context;
(void) stream;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
} // namespace etcdserverpb

View File

@ -0,0 +1,235 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: etcd.proto
#ifndef GRPC_etcd_2eproto__INCLUDED
#define GRPC_etcd_2eproto__INCLUDED
#include "etcd.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/stub_options.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace grpc_impl {
class CompletionQueue;
class ServerCompletionQueue;
class ServerContext;
} // namespace grpc_impl
namespace grpc {
namespace experimental {
template <typename RequestT, typename ResponseT>
class MessageAllocator;
} // namespace experimental
} // namespace grpc
namespace etcdserverpb {
class Watch final {
public:
static constexpr char const* service_full_name() {
return "etcdserverpb.Watch";
}
class StubInterface {
public:
virtual ~StubInterface() {}
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> Watch(::grpc::ClientContext* context) {
return std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(WatchRaw(context));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> AsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(AsyncWatchRaw(context, cq, tag));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> PrepareAsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(PrepareAsyncWatchRaw(context, cq));
}
class experimental_async_interface {
public:
virtual ~experimental_async_interface() {}
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
virtual void Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) = 0;
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
private:
virtual ::grpc::ClientReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* WatchRaw(::grpc::ClientContext* context) = 0;
virtual ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0;
virtual ::grpc::ClientAsyncReaderWriterInterface< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) = 0;
};
class Stub final : public StubInterface {
public:
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
std::unique_ptr< ::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> Watch(::grpc::ClientContext* context) {
return std::unique_ptr< ::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(WatchRaw(context));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> AsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(AsyncWatchRaw(context, cq, tag));
}
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>> PrepareAsyncWatch(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>>(PrepareAsyncWatchRaw(context, cq));
}
class experimental_async final :
public StubInterface::experimental_async_interface {
public:
void Watch(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::etcdserverpb::WatchRequest,::etcdserverpb::WatchResponse>* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }
Stub* stub() { return stub_; }
Stub* stub_;
};
class experimental_async_interface* experimental_async() override { return &async_stub_; }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
class experimental_async async_stub_{this};
::grpc::ClientReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* WatchRaw(::grpc::ClientContext* context) override;
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* AsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientAsyncReaderWriter< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* PrepareAsyncWatchRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override;
const ::grpc::internal::RpcMethod rpcmethod_Watch_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
class Service : public ::grpc::Service {
public:
Service();
virtual ~Service();
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
virtual ::grpc::Status Watch(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream);
};
template <class BaseClass>
class WithAsyncMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithAsyncMethod_Watch() {
::grpc::Service::MarkMethodAsync(0);
}
~WithAsyncMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestWatch(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
}
};
typedef WithAsyncMethod_Watch<Service > AsyncService;
template <class BaseClass>
class ExperimentalWithCallbackMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithCallbackMethod_Watch() {
::grpc::Service::experimental().MarkMethodCallback(0,
new ::grpc_impl::internal::CallbackBidiHandler< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>(
[this] { return this->Watch(); }));
}
~ExperimentalWithCallbackMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual ::grpc::experimental::ServerBidiReactor< ::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>* Watch() {
return new ::grpc_impl::internal::UnimplementedBidiReactor<
::etcdserverpb::WatchRequest, ::etcdserverpb::WatchResponse>;}
};
typedef ExperimentalWithCallbackMethod_Watch<Service > ExperimentalCallbackService;
template <class BaseClass>
class WithGenericMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithGenericMethod_Watch() {
::grpc::Service::MarkMethodGeneric(0);
}
~WithGenericMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
};
template <class BaseClass>
class WithRawMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithRawMethod_Watch() {
::grpc::Service::MarkMethodRaw(0);
}
~WithRawMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestWatch(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
}
};
template <class BaseClass>
class ExperimentalWithRawCallbackMethod_Watch : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithRawCallbackMethod_Watch() {
::grpc::Service::experimental().MarkMethodRawCallback(0,
new ::grpc_impl::internal::CallbackBidiHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
[this] { return this->Watch(); }));
}
~ExperimentalWithRawCallbackMethod_Watch() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Watch(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::etcdserverpb::WatchResponse, ::etcdserverpb::WatchRequest>* /*stream*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual ::grpc::experimental::ServerBidiReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* Watch() {
return new ::grpc_impl::internal::UnimplementedBidiReactor<
::grpc::ByteBuffer, ::grpc::ByteBuffer>;}
};
typedef Service StreamedUnaryService;
typedef Service SplitStreamedService;
typedef Service StreamedService;
};
} // namespace etcdserverpb
#endif // GRPC_etcd_2eproto__INCLUDED

3737
proxy/src/grpc/etcd.pb.cc Normal file

File diff suppressed because it is too large Load Diff

2465
proxy/src/grpc/etcd.pb.h Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,83 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: master.proto
#include "master.pb.h"
#include "master.grpc.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/client_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/rpc_service_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace masterpb {
static const char* Master_method_names[] = {
"/masterpb.Master/CreateCollection",
};
std::unique_ptr< Master::Stub> Master::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
(void)options;
std::unique_ptr< Master::Stub> stub(new Master::Stub(channel));
return stub;
}
Master::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
: channel_(channel), rpcmethod_CreateCollection_(Master_method_names[0], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
{}
::grpc::Status Master::Stub::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) {
return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_CreateCollection_, context, request, response);
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, std::move(f));
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, std::move(f));
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, reactor);
}
void Master::Stub::experimental_async::CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_CreateCollection_, context, request, response, reactor);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* Master::Stub::AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::Status>::Create(channel_.get(), cq, rpcmethod_CreateCollection_, context, request, true);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* Master::Stub::PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::Status>::Create(channel_.get(), cq, rpcmethod_CreateCollection_, context, request, false);
}
Master::Service::Service() {
AddMethod(new ::grpc::internal::RpcServiceMethod(
Master_method_names[0],
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< Master::Service, ::milvus::grpc::Mapping, ::milvus::grpc::Status>(
std::mem_fn(&Master::Service::CreateCollection), this)));
}
Master::Service::~Service() {
}
::grpc::Status Master::Service::CreateCollection(::grpc::ServerContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response) {
(void) context;
(void) request;
(void) response;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
} // namespace masterpb

View File

@ -0,0 +1,252 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: master.proto
#ifndef GRPC_master_2eproto__INCLUDED
#define GRPC_master_2eproto__INCLUDED
#include "master.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/stub_options.h>
#include <grpcpp/impl/codegen/sync_stream.h>
namespace grpc_impl {
class CompletionQueue;
class ServerCompletionQueue;
class ServerContext;
} // namespace grpc_impl
namespace grpc {
namespace experimental {
template <typename RequestT, typename ResponseT>
class MessageAllocator;
} // namespace experimental
} // namespace grpc
namespace masterpb {
class Master final {
public:
static constexpr char const* service_full_name() {
return "masterpb.Master";
}
class StubInterface {
public:
virtual ~StubInterface() {}
virtual ::grpc::Status CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) = 0;
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>> AsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>>(AsyncCreateCollectionRaw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>> PrepareAsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>>(PrepareAsyncCreateCollectionRaw(context, request, cq));
}
class experimental_async_interface {
public:
virtual ~experimental_async_interface() {}
virtual void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) = 0;
virtual void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) = 0;
virtual void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
virtual void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>* AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientAsyncResponseReaderInterface< ::milvus::grpc::Status>* PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) = 0;
};
class Stub final : public StubInterface {
public:
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
::grpc::Status CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::milvus::grpc::Status* response) override;
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>> AsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>>(AsyncCreateCollectionRaw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>> PrepareAsyncCreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>>(PrepareAsyncCreateCollectionRaw(context, request, cq));
}
class experimental_async final :
public StubInterface::experimental_async_interface {
public:
void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) override;
void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, std::function<void(::grpc::Status)>) override;
void CreateCollection(::grpc::ClientContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
void CreateCollection(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::Status* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }
Stub* stub() { return stub_; }
Stub* stub_;
};
class experimental_async_interface* experimental_async() override { return &async_stub_; }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
class experimental_async async_stub_{this};
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* AsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::milvus::grpc::Status>* PrepareAsyncCreateCollectionRaw(::grpc::ClientContext* context, const ::milvus::grpc::Mapping& request, ::grpc::CompletionQueue* cq) override;
const ::grpc::internal::RpcMethod rpcmethod_CreateCollection_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
class Service : public ::grpc::Service {
public:
Service();
virtual ~Service();
virtual ::grpc::Status CreateCollection(::grpc::ServerContext* context, const ::milvus::grpc::Mapping* request, ::milvus::grpc::Status* response);
};
template <class BaseClass>
class WithAsyncMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithAsyncMethod_CreateCollection() {
::grpc::Service::MarkMethodAsync(0);
}
~WithAsyncMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestCreateCollection(::grpc::ServerContext* context, ::milvus::grpc::Mapping* request, ::grpc::ServerAsyncResponseWriter< ::milvus::grpc::Status>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
}
};
typedef WithAsyncMethod_CreateCollection<Service > AsyncService;
template <class BaseClass>
class ExperimentalWithCallbackMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithCallbackMethod_CreateCollection() {
::grpc::Service::experimental().MarkMethodCallback(0,
new ::grpc_impl::internal::CallbackUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>(
[this](::grpc::ServerContext* context,
const ::milvus::grpc::Mapping* request,
::milvus::grpc::Status* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
return this->CreateCollection(context, request, response, controller);
}));
}
void SetMessageAllocatorFor_CreateCollection(
::grpc::experimental::MessageAllocator< ::milvus::grpc::Mapping, ::milvus::grpc::Status>* allocator) {
static_cast<::grpc_impl::internal::CallbackUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>*>(
::grpc::Service::experimental().GetHandler(0))
->SetMessageAllocator(allocator);
}
~ExperimentalWithCallbackMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual void CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
};
typedef ExperimentalWithCallbackMethod_CreateCollection<Service > ExperimentalCallbackService;
template <class BaseClass>
class WithGenericMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithGenericMethod_CreateCollection() {
::grpc::Service::MarkMethodGeneric(0);
}
~WithGenericMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
};
template <class BaseClass>
class WithRawMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithRawMethod_CreateCollection() {
::grpc::Service::MarkMethodRaw(0);
}
~WithRawMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestCreateCollection(::grpc::ServerContext* context, ::grpc::ByteBuffer* request, ::grpc::ServerAsyncResponseWriter< ::grpc::ByteBuffer>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
}
};
template <class BaseClass>
class ExperimentalWithRawCallbackMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithRawCallbackMethod_CreateCollection() {
::grpc::Service::experimental().MarkMethodRawCallback(0,
new ::grpc_impl::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
[this](::grpc::ServerContext* context,
const ::grpc::ByteBuffer* request,
::grpc::ByteBuffer* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
this->CreateCollection(context, request, response, controller);
}));
}
~ExperimentalWithRawCallbackMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual void CreateCollection(::grpc::ServerContext* /*context*/, const ::grpc::ByteBuffer* /*request*/, ::grpc::ByteBuffer* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
};
template <class BaseClass>
class WithStreamedUnaryMethod_CreateCollection : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithStreamedUnaryMethod_CreateCollection() {
::grpc::Service::MarkMethodStreamed(0,
new ::grpc::internal::StreamedUnaryHandler< ::milvus::grpc::Mapping, ::milvus::grpc::Status>(std::bind(&WithStreamedUnaryMethod_CreateCollection<BaseClass>::StreamedCreateCollection, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_CreateCollection() override {
BaseClassMustBeDerivedFromService(this);
}
// disable regular version of this method
::grpc::Status CreateCollection(::grpc::ServerContext* /*context*/, const ::milvus::grpc::Mapping* /*request*/, ::milvus::grpc::Status* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
// replace default version of method with streamed unary
virtual ::grpc::Status StreamedCreateCollection(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::milvus::grpc::Mapping,::milvus::grpc::Status>* server_unary_streamer) = 0;
};
typedef WithStreamedUnaryMethod_CreateCollection<Service > StreamedUnaryService;
typedef Service SplitStreamedService;
typedef WithStreamedUnaryMethod_CreateCollection<Service > StreamedService;
};
} // namespace masterpb
#endif // GRPC_master_2eproto__INCLUDED

1590
proxy/src/grpc/master.pb.cc Normal file

File diff suppressed because it is too large Load Diff

1024
proxy/src/grpc/master.pb.h Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,164 @@
syntax = "proto3";
package etcdserverpb;
service Watch {
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {
}
}
message WatchRequest {
// request_union is a request to either create a new watcher or cancel an existing watcher.
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
WatchProgressRequest progress_request = 3;
}
}
message WatchCreateRequest {
// key is the key to register for watching.
bytes key = 1;
// range_end is the end of the range [key, range_end) to watch. If range_end is not given,
// only the key argument is watched. If range_end is equal to '\0', all keys greater than
// or equal to the key argument are watched.
// If the range_end is one bit larger than the given key,
// then all keys with the prefix (the given key) will be watched.
bytes range_end = 2;
// start_revision is an optional revision to watch from (inclusive). No start_revision is "now".
int64 start_revision = 3;
// progress_notify is set so that the etcd server will periodically send a WatchResponse with
// no events to the new watcher if there are no recent events. It is useful when clients
// wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4;
enum FilterType {
// filter out put event.
NOPUT = 0;
// filter out delete event.
NODELETE = 1;
}
// filters filter the events at server side before it sends back to the watcher.
repeated FilterType filters = 5;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
// If watch_id is provided and non-zero, it will be assigned to this watcher.
// Since creating a watcher in etcd is not a synchronous operation,
// this can be used ensure that ordering is correct when creating multiple
// watchers on the same stream. Creating a watcher with an ID already in
// use on the stream will cause an error to be returned.
int64 watch_id = 7;
// fragment enables splitting large revisions into multiple watch responses.
bool fragment = 8;
}
message WatchCancelRequest {
// watch_id is the watcher id to cancel so that no more events are transmitted.
int64 watch_id = 1;
}
// Requests the a watch stream progress status be sent in the watch response stream as soon as
// possible.
message WatchProgressRequest {
}
message WatchResponse {
ResponseHeader header = 1;
// watch_id is the ID of the watcher that corresponds to the response.
int64 watch_id = 2;
// created is set to true if the response is for a create watch request.
// The client should record the watch_id and expect to receive events for
// the created watcher from the same stream.
// All events sent to the created watcher will attach with the same watch_id.
bool created = 3;
// canceled is set to true if the response is for a cancel watch request.
// No further events will be sent to the canceled watcher.
bool canceled = 4;
// compact_revision is set to the minimum index if a watcher tries to watch
// at a compacted index.
//
// This happens when creating a watcher at a compacted revision or the watcher cannot
// catch up with the progress of the key-value store.
//
// The client should treat the watcher as canceled and should not try to create any
// watcher with the same start_revision again.
int64 compact_revision = 5;
// cancel_reason indicates the reason for canceling the watcher.
string cancel_reason = 6;
// framgment is true if large watch response was split over multiple responses.
bool fragment = 7;
repeated Event events = 11;
}
message ResponseHeader {
// cluster_id is the ID of the cluster which sent the response.
uint64 cluster_id = 1;
// member_id is the ID of the member which sent the response.
uint64 member_id = 2;
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
int64 revision = 3;
// raft_term is the raft term when the request was applied.
uint64 raft_term = 4;
}
message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;
// create_revision is the revision of last creation on this key.
int64 create_revision = 2;
// mod_revision is the revision of last modification on this key.
int64 mod_revision = 3;
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
int64 version = 4;
// value is the value held by the key, in bytes.
bytes value = 5;
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
int64 lease = 6;
}
message Event {
enum EventType {
PUT = 0;
DELETE = 1;
}
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
EventType type = 1;
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}

View File

@ -175,7 +175,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uin
}
}
for (auto &stat : stats) {
if (stat == pulsar::ResultOk) {
if (stat != pulsar::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(stat));
}
}
@ -201,7 +201,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
}
}
for (auto &stat : stats) {
if (stat == pulsar::ResultOk) {
if (stat != pulsar::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(stat));
}
}

View File

@ -30,6 +30,7 @@ Result MsgConsumer::receive(milvus::grpc::QueryResult &res) {
if (result == pulsar::ResultOk) {
res.ParseFromString(msg.getDataAsString());
}
consumer_.acknowledge(msg);
return result;
}
@ -39,6 +40,7 @@ Result MsgConsumer::receive(milvus::grpc::Entities &res) {
if (result == pulsar::ResultOk) {
res.ParseFromString(msg.getDataAsString());
}
consumer_.acknowledge(msg);
return result;
}
@ -48,6 +50,7 @@ Result MsgConsumer::receive(milvus::grpc::EntityIds &res) {
if (result == pulsar::ResultOk) {
res.ParseFromString(msg.getDataAsString());
}
consumer_.acknowledge(msg);
return result;
}
@ -57,6 +60,7 @@ Result MsgConsumer::receive(milvus::grpc::Status &res) {
if (result == pulsar::ResultOk) {
res.ParseFromString(msg.getDataAsString());
}
consumer_.acknowledge(msg);
return result;
}

View File

@ -0,0 +1,12 @@
include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/third_party/protobuf/src)
include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/include)
add_subdirectory( etcd_watcher )
aux_source_directory( ./master master_src)
add_library(meta ${master_src}
./etcd_watcher/Watcher.cpp
${PROJECT_SOURCE_DIR}/src/grpc/etcd.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/etcd.grpc.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/master.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/master.grpc.pb.cc
)

View File

@ -0,0 +1,14 @@
AUX_SOURCE_DIRECTORY(. watcher_src)
add_executable(test_watcher
${watcher_src}
${PROJECT_SOURCE_DIR}/src/grpc/etcd.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/etcd.grpc.pb.cc
)
target_link_libraries(
test_watcher
PRIVATE
libprotobuf
grpc++_reflection
grpc++
)

View File

@ -0,0 +1,90 @@
#include "Watcher.h"
#include <memory>
#include <utility>
#include "grpc/etcd.grpc.pb.h"
namespace milvus {
namespace master {
Watcher::Watcher(const std::string &address,
const std::string &key,
std::function<void(etcdserverpb::WatchResponse)> callback,
bool with_prefix) {
auto channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials());
stub_ = etcdserverpb::Watch::NewStub(channel);
call_ = std::make_unique<AsyncWatchAction>(key, with_prefix, stub_.get());
work_thread_ = std::thread([&]() {
call_->WaitForResponse(callback);
});
}
void Watcher::Cancel() {
call_->CancelWatch();
}
Watcher::~Watcher() {
Cancel();
work_thread_.join();
}
AsyncWatchAction::AsyncWatchAction(const std::string &key, bool with_prefix, etcdserverpb::Watch::Stub *stub) {
// tag `1` means to wire a rpc
stream_ = stub->AsyncWatch(&context_, &cq_, (void *) 1);
etcdserverpb::WatchRequest req;
req.mutable_create_request()->set_key(key);
if (with_prefix) {
std::string range_end(key);
int ascii = (int) range_end[range_end.length() - 1];
range_end.back() = ascii + 1;
req.mutable_create_request()->set_range_end(range_end);
}
void *got_tag;
bool ok = false;
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *) 1) {
// tag `2` means write watch request to stream
stream_->Write(req, (void *) 2);
} else {
throw std::runtime_error("failed to create a watch connection");
}
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *) 2) {
stream_->Read(&reply_, (void *) this);
} else {
throw std::runtime_error("failed to write WatchCreateRequest to server");
}
}
void AsyncWatchAction::WaitForResponse(std::function<void(etcdserverpb::WatchResponse)> callback) {
void *got_tag;
bool ok = false;
while (cq_.Next(&got_tag, &ok)) {
if (!ok) {
break;
}
if (got_tag == (void *) 3) {
cancled_.store(true);
cq_.Shutdown();
break;
} else if (got_tag == (void *) this) // read tag
{
if (reply_.events_size()) {
callback(reply_);
}
stream_->Read(&reply_, (void *) this);
}
}
}
void AsyncWatchAction::CancelWatch() {
if (!cancled_.load()) {
// tag `3` mean write done
stream_->WritesDone((void *) 3);
cancled_.store(true);
}
}
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include "grpc/etcd.grpc.pb.h"
#include <grpc++/grpc++.h>
#include <thread>
namespace milvus {
namespace master {
class AsyncWatchAction;
class Watcher {
public:
Watcher(std::string const &address,
std::string const &key,
std::function<void(etcdserverpb::WatchResponse)> callback,
bool with_prefix = true);
void Cancel();
~Watcher();
private:
std::unique_ptr<etcdserverpb::Watch::Stub> stub_;
std::unique_ptr<AsyncWatchAction> call_;
std::thread work_thread_;
};
class AsyncWatchAction {
public:
AsyncWatchAction(const std::string &key, bool with_prefix, etcdserverpb::Watch::Stub* stub);
void WaitForResponse(std::function<void(etcdserverpb::WatchResponse)> callback);
void CancelWatch();
private:
// Status status;
grpc::ClientContext context_;
grpc::CompletionQueue cq_;
etcdserverpb::WatchResponse reply_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<etcdserverpb::WatchRequest, etcdserverpb::WatchResponse>> stream_;
std::atomic<bool> cancled_ = false;
};
}
}

View File

@ -0,0 +1,31 @@
// Steps to test this file:
// 1. start a etcdv3 server
// 2. run this test
// 3. modify test key using etcdctlv3 or etcd-clientv3(Must using v3 api)
// TODO: move this test to unittest
#include "Watcher.h"
using namespace milvus::master;
int main() {
try {
Watcher watcher("127.0.0.1:2379", "SomeKey", [](etcdserverpb::WatchResponse res) {
std::cerr << "Key1 changed!" << std::endl;
std::cout << "Event size: " << res.events_size() << std::endl;
for (auto &event: res.events()) {
std::cout <<
event.kv().key() << ":" <<
event.kv().value() << std::endl;
}
}, false);
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(60000));
watcher.Cancel();
break;
}
}
catch (const std::exception &e) {
std::cout << e.what();
}
}

View File

@ -0,0 +1,35 @@
#include "GrpcClient.h"
#include "grpc++/grpc++.h"
using grpc::ClientContext;
namespace milvus {
namespace master {
GrpcClient::GrpcClient(const std::string &addr) {
auto channel = ::grpc::CreateChannel(addr, ::grpc::InsecureChannelCredentials());
stub_ = masterpb::Master::NewStub(channel);
}
GrpcClient::GrpcClient(std::shared_ptr<::grpc::Channel> &channel)
: stub_(masterpb::Master::NewStub(channel)) {
}
Status GrpcClient::CreateCollection(const milvus::grpc::Mapping &mapping) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->CreateCollection(&context, mapping, &response);
if (!grpc_status.ok()) {
std::cerr << "CreateHybridCollection gRPC failed!" << std::endl;
return Status(grpc_status.error_code(), grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
// TODO: LOG
return Status(response.error_code(), response.reason());
}
return Status::OK();
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include "grpc/master.grpc.pb.h"
#include "grpc/message.pb.h"
#include "grpc++/grpc++.h"
#include "utils/Status.h"
namespace milvus {
namespace master {
class GrpcClient {
public:
explicit GrpcClient(const std::string& addr);
explicit GrpcClient(std::shared_ptr<::grpc::Channel>& channel);
~GrpcClient() = default;
public:
Status
CreateCollection(const milvus::grpc::Mapping& mapping);
private:
std::unique_ptr<masterpb::Master::Stub> stub_;
};
}
}

View File

@ -43,7 +43,7 @@ set( GRPC_SERVER_FILES ${GRPC_IMPL_FILES}
aux_source_directory( ${MILVUS_ENGINE_SRC}/server/context SERVER_CONTEXT_FILES )
add_library( server STATIC MessageWrapper.cpp MessageWrapper.h)
add_library( server STATIC)
target_sources( server
PRIVATE ${GRPC_SERVER_FILES}
${GRPC_SERVICE_FILES}

View File

@ -0,0 +1,21 @@
#include "MetaWrapper.h"
#include "config/ServerConfig.h"
namespace milvus{
namespace server {
MetaWrapper& MetaWrapper::GetInstance() {
static MetaWrapper wrapper;
return wrapper;
}
Status MetaWrapper::Init() {
auto addr = config.master.address() + ":" + std::to_string(config.master.port());
client_ = std::make_shared<milvus::master::GrpcClient>(addr);
}
std::shared_ptr<milvus::master::GrpcClient> MetaWrapper::MetaClient() {
return client_;
}
}
}

View File

@ -0,0 +1,24 @@
#include "utils/Status.h"
#include "meta/master/GrpcClient.h"
namespace milvus{
namespace server{
class MetaWrapper {
public:
static MetaWrapper&
GetInstance();
Status
Init();
std::shared_ptr<milvus::master::GrpcClient>
MetaClient();
private:
std::shared_ptr<milvus::master::GrpcClient> client_;
};
}
}

View File

@ -34,6 +34,7 @@
#include "utils/SignalHandler.h"
#include "utils/TimeRecorder.h"
#include "MessageWrapper.h"
#include "MetaWrapper.h"
namespace milvus {
namespace server {
@ -240,12 +241,15 @@ Server::StartService() {
grpc::GrpcServer::GetInstance().Start();
// Init pulsar message client
stat = MessageWrapper::GetInstance().Init();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "Pulsar message client start service fail: " << stat.message();
goto FAIL;
}
MetaWrapper::GetInstance().Init();
return Status::OK();
FAIL:
std::cerr << "Milvus initializes fail: " << stat.message() << std::endl;

View File

@ -99,7 +99,7 @@ ReqScheduler::ExecuteReq(const BaseReqPtr& req_ptr) {
}
status = req_ptr->WaitToFinish(); // sync execution
if (!status.ok()) {
if (!status.ok()){
return status;
}
@ -163,7 +163,10 @@ int64_t ReqScheduler::GetLatestDeliveredReqTime() {
if (sending_){
return latest_req_time_;
}
return TSOracle::GetInstance().GetTimeStamp();
auto ts = TSOracle::GetInstance().GetTimeStamp();
latest_req_time_ = ts;
assert(ts != 0);
return ts;
}
void ReqScheduler::UpdateLatestDeliveredReqTime(int64_t time) {

View File

@ -66,7 +66,7 @@ class ReqScheduler {
// for time synchronous
std::mutex time_syc_mtx_;
int64_t latest_req_time_;
int64_t latest_req_time_ = 0;
bool sending_;
std::map<std::string, ReqQueuePtr> req_groups_;

View File

@ -26,6 +26,7 @@
#include "tracing/TextMapCarrier.h"
#include "tracing/TracerUtil.h"
#include "utils/Log.h"
#include "server/MetaWrapper.h"
namespace milvus {
namespace server {
@ -340,6 +341,10 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext *context, const ::mil
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
Status status = MetaWrapper::GetInstance().MetaClient()->CreateCollection(*request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context)
return ::grpc::Status::OK;
}

View File

@ -105,7 +105,7 @@ GrpcServer::StartService() {
int client_id = 0;
std::string pulsar_server_addr
(std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
timesync::TimeSync syc(client_id,GetMessageTimeSyncTime, 20, pulsar_server_addr, "TimeSync");
timesync::TimeSync syc(client_id,GetMessageTimeSyncTime, 400, pulsar_server_addr, "TimeSync");
// Add gRPC interceptor

View File

@ -66,6 +66,10 @@ add_custom_command(TARGET generate_suvlim_pb_grpc
COMMAND echo "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_cpp.sh" -p "${PROTOC_EXCUTABLE}" -g "${GRPC_CPP_PLUGIN_EXCUTABLE}"
COMMAND ${PROTOC_EXCUTABLE} -I "${PROTO_PATH}/proto" --grpc_out "${PROTO_PATH}" --cpp_out "${PROTO_PATH}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}"
"${PROTO_PATH}/proto/etcd.proto"
DEPENDS "${PROTO_PATH}/proto/etcd.proto"
)
set_property( GLOBAL PROPERTY PROTOC_EXCUTABLE ${PROTOC_EXCUTABLE})

View File

@ -5,32 +5,41 @@ import (
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
msgpb "github.com/czs007/suvlim/pkg/master/grpc/message"
timesync "github.com/czs007/suvlim/timesync"
"github.com/golang/protobuf/proto"
"log"
"time"
"strconv"
)
type MessageClient struct {
// timesync
timeSyncCfg *timesync.ReaderTimeSyncCfg
//message channel
insertOrDeleteChan chan *msgpb.InsertOrDeleteMsg
searchChan chan *msgpb.SearchMsg
timeSyncChan chan *msgpb.TimeSyncMsg
key2SegChan chan *msgpb.Key2SegMsg
searchChan chan *msgpb.SearchMsg
key2SegChan chan *msgpb.Key2SegMsg
// pulsar
client pulsar.Client
searchResultProducer pulsar.Producer
insertOrDeleteConsumer pulsar.Consumer
searchConsumer pulsar.Consumer
timeSyncConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
client pulsar.Client
searchResultProducer pulsar.Producer
searchConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
// batch messages
InsertOrDeleteMsg []*msgpb.InsertOrDeleteMsg
SearchMsg []*msgpb.SearchMsg
TimeSyncMsg []*msgpb.TimeSyncMsg
Key2SegMsg []*msgpb.Key2SegMsg
InsertOrDeleteMsg []*msgpb.InsertOrDeleteMsg
Key2SegMsg []*msgpb.Key2SegMsg
SearchMsg []*msgpb.SearchMsg
timestampBatchStart uint64
timestampBatchEnd uint64
batchIDLen int
}
func (mc *MessageClient) TimeSyncStart() uint64 {
return mc.timestampBatchStart
}
func (mc *MessageClient) TimeSyncEnd() uint64 {
return mc.timestampBatchEnd
}
func (mc *MessageClient) Send(ctx context.Context, msg msgpb.QueryResult) {
@ -42,34 +51,11 @@ func (mc *MessageClient) Send(ctx context.Context, msg msgpb.QueryResult) {
}
}
func (mc *MessageClient) GetSearchChan() chan *msgpb.SearchMsg {
func (mc *MessageClient) GetSearchChan() <- chan *msgpb.SearchMsg {
return mc.searchChan
}
func (mc *MessageClient) ReceiveInsertOrDeleteMsg() {
var count = 0
var start time.Time
for {
insetOrDeleteMsg := msgpb.InsertOrDeleteMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &insetOrDeleteMsg)
if err != nil {
log.Fatal(err)
}
if count == 0 {
start = time.Now()
}
count++
mc.insertOrDeleteChan <- &insetOrDeleteMsg
mc.insertOrDeleteConsumer.Ack(msg)
if count == 100000 - 1 {
elapsed := time.Since(start)
fmt.Println("Query node ReceiveInsertOrDeleteMsg time:", elapsed)
}
}
}
func (mc *MessageClient) ReceiveSearchMsg() {
func (mc *MessageClient) receiveSearchMsg() {
for {
searchMsg := msgpb.SearchMsg{}
msg, err := mc.searchConsumer.Receive(context.Background())
@ -82,20 +68,7 @@ func (mc *MessageClient) ReceiveSearchMsg() {
}
}
func (mc *MessageClient) ReceiveTimeSyncMsg() {
for {
timeSyncMsg := msgpb.TimeSyncMsg{}
msg, err := mc.timeSyncConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &timeSyncMsg)
if err != nil {
log.Fatal(err)
}
mc.timeSyncChan <- &timeSyncMsg
mc.timeSyncConsumer.Ack(msg)
}
}
func (mc *MessageClient) ReceiveKey2SegMsg() {
func (mc *MessageClient) receiveKey2SegMsg() {
for {
key2SegMsg := msgpb.Key2SegMsg{}
msg, err := mc.key2segConsumer.Receive(context.Background())
@ -109,13 +82,17 @@ func (mc *MessageClient) ReceiveKey2SegMsg() {
}
func (mc *MessageClient) ReceiveMessage() {
go mc.ReceiveInsertOrDeleteMsg()
go mc.ReceiveSearchMsg()
go mc.ReceiveTimeSyncMsg()
go mc.ReceiveKey2SegMsg()
err := mc.timeSyncCfg.Start()
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
go mc.receiveSearchMsg()
go mc.receiveKey2SegMsg()
}
func (mc *MessageClient) CreatProducer(topicName string) pulsar.Producer {
func (mc *MessageClient) creatProducer(topicName string) pulsar.Producer {
producer, err := mc.client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
})
@ -126,7 +103,7 @@ func (mc *MessageClient) CreatProducer(topicName string) pulsar.Producer {
return producer
}
func (mc *MessageClient) CreateConsumer(topicName string) pulsar.Consumer {
func (mc *MessageClient) createConsumer(topicName string) pulsar.Consumer {
consumer, err := mc.client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: "reader",
@ -138,7 +115,7 @@ func (mc *MessageClient) CreateConsumer(topicName string) pulsar.Consumer {
return consumer
}
func (mc *MessageClient) CreateClient(url string) pulsar.Client {
func (mc *MessageClient) createClient(url string) pulsar.Client {
// create client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: url,
@ -152,36 +129,51 @@ func (mc *MessageClient) CreateClient(url string) pulsar.Client {
func (mc *MessageClient) InitClient(url string) {
//create client
mc.client = mc.CreateClient(url)
mc.client = mc.createClient(url)
//create producer
mc.searchResultProducer = mc.CreatProducer("SearchResult")
mc.searchResultProducer = mc.creatProducer("SearchResult")
//create consumer
mc.insertOrDeleteConsumer = mc.CreateConsumer("InsertOrDelete")
mc.searchConsumer = mc.CreateConsumer("Search")
mc.timeSyncConsumer = mc.CreateConsumer("TimeSync")
mc.key2segConsumer = mc.CreateConsumer("Key2Seg")
mc.searchConsumer = mc.createConsumer("Search")
mc.key2segConsumer = mc.createConsumer("Key2Seg")
// init channel
mc.insertOrDeleteChan = make(chan *msgpb.InsertOrDeleteMsg, 1000)
mc.searchChan = make(chan *msgpb.SearchMsg, 1000)
mc.timeSyncChan = make(chan *msgpb.TimeSyncMsg, 1000)
mc.key2SegChan = make(chan *msgpb.Key2SegMsg, 1000)
mc.searchChan = make(chan *msgpb.SearchMsg, 10000)
mc.key2SegChan = make(chan *msgpb.Key2SegMsg, 10000)
mc.InsertOrDeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 1000)
mc.SearchMsg = make([]*msgpb.SearchMsg, 1000)
mc.TimeSyncMsg = make([]*msgpb.TimeSyncMsg, 1000)
mc.Key2SegMsg = make([]*msgpb.Key2SegMsg, 1000)
mc.InsertOrDeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 0)
mc.Key2SegMsg = make([]*msgpb.Key2SegMsg, 0)
//init timesync
URL := "pulsar://localhost:6650"
timeSyncTopic := "TimeSync"
timeSyncSubName := "reader"
readTopics := make([]string, 0, 1024)
for i := 0; i < 1024; i++ {
str := "InsertOrDelete-partition-"
str = str + strconv.Itoa(i)
readTopics = append(readTopics, str)
}
readSubName := "reader"
proxyIdList := []int64{0}
timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -2, timesync.WithReaderQueueSize(1024))
if err != nil {
log.Fatal(err)
}
mc.timeSyncCfg = timeSync.(*timesync.ReaderTimeSyncCfg)
mc.timestampBatchStart = 0
mc.timestampBatchEnd = 0
mc.batchIDLen = 0
}
func (mc *MessageClient) Close() {
defer mc.client.Close()
defer mc.searchResultProducer.Close()
defer mc.insertOrDeleteConsumer.Close()
defer mc.searchConsumer.Close()
defer mc.timeSyncConsumer.Close()
defer mc.key2segConsumer.Close()
mc.client.Close()
mc.searchResultProducer.Close()
mc.searchConsumer.Close()
mc.key2segConsumer.Close()
mc.timeSyncCfg.Close()
}
type MessageType int
@ -195,26 +187,45 @@ const (
Statistics MessageType = 5
)
func (mc *MessageClient) PrepareMsg(messageType MessageType, msgLen int) {
func (mc *MessageClient) prepareMsg(messageType MessageType, msgLen int) {
switch messageType {
case InsertOrDelete:
for i := 0; i < msgLen; i++ {
msg := <-mc.insertOrDeleteChan
msg := <-mc.timeSyncCfg.InsertOrDelete()
mc.InsertOrDeleteMsg = append(mc.InsertOrDeleteMsg, msg)
}
case Search:
for i := 0; i < msgLen; i++ {
msg := <-mc.searchChan
mc.SearchMsg = append(mc.SearchMsg, msg)
}
case TimeSync:
mc.timestampBatchStart = mc.timestampBatchEnd
mc.batchIDLen = 0
for i := 0; i < msgLen; i++ {
msg := <-mc.timeSyncChan
mc.TimeSyncMsg = append(mc.TimeSyncMsg, msg)
msg, ok := <-mc.timeSyncCfg.TimeSync()
if !ok {
fmt.Println("cnn't get data from timesync chan")
}
if i == msgLen-1 {
mc.timestampBatchEnd = msg.Timestamp
}
mc.batchIDLen += int(msg.NumRecorders)
}
}
}
func (mc *MessageClient) PrepareBatchMsg() []int {
// assume the channel not full
mc.InsertOrDeleteMsg = mc.InsertOrDeleteMsg[:0]
mc.batchIDLen = 0
// get the length of every channel
timeLen := mc.timeSyncCfg.TimeSyncChanLen()
// get message from channel to slice
if timeLen > 0 {
mc.prepareMsg(TimeSync, timeLen)
mc.prepareMsg(InsertOrDelete, mc.batchIDLen)
}
return []int{mc.batchIDLen, timeLen}
}
func (mc *MessageClient) PrepareKey2SegmentMsg() {
mc.Key2SegMsg = mc.Key2SegMsg[:0]
msgLen := len(mc.key2SegChan)
@ -223,22 +234,3 @@ func (mc *MessageClient) PrepareKey2SegmentMsg() {
mc.Key2SegMsg = append(mc.Key2SegMsg, msg)
}
}
func (mc *MessageClient) PrepareBatchMsg() []int {
// assume the channel not full
mc.InsertOrDeleteMsg = mc.InsertOrDeleteMsg[:0]
//mc.SearchMsg = mc.SearchMsg[:0]
mc.TimeSyncMsg = mc.TimeSyncMsg[:0]
// get the length of every channel
insertOrDeleteLen := len(mc.insertOrDeleteChan)
//searchLen := len(mc.searchChan)
timeLen := len(mc.timeSyncChan)
// get message from channel to slice
mc.PrepareMsg(InsertOrDelete, insertOrDeleteLen)
//mc.PrepareMsg(Search, searchLen)
mc.PrepareMsg(TimeSync, timeLen)
return []int{insertOrDeleteLen}
}

View File

@ -17,10 +17,10 @@ import (
"fmt"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/reader/message_client"
"github.com/stretchr/testify/assert"
"sort"
"sync"
"sync/atomic"
"time"
)
type InsertData struct {
@ -58,7 +58,8 @@ type QueryNode struct {
QueryNodeId uint64
Collections []*Collection
SegmentsMap map[int64]*Segment
messageClient message_client.MessageClient
messageClient *message_client.MessageClient
//mc *message_client.MessageClient
queryNodeTimeSync *QueryNodeTime
buffer QueryNodeDataBuffer
deletePreprocessData DeletePreprocessData
@ -86,6 +87,38 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
validSearchBuffer: make([]bool, 0),
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
messageClient: &mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
}
}
func (node *QueryNode) Close() {
node.messageClient.Close()
}
func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.MessageClient) *QueryNode {
queryNodeTimeSync := &QueryNodeTime{
ReadTimeSyncMin: timeSync,
ReadTimeSyncMax: timeSync,
WriteTimeSync: timeSync,
SearchTimeSync: timeSync,
TSOTimeSync: timeSync,
}
segmentsMap := make(map[int64]*Segment)
buffer := QueryNodeDataBuffer{
InsertDeleteBuffer: make([]*msgPb.InsertOrDeleteMsg, 0),
SearchBuffer: make([]*msgPb.SearchMsg, 0),
validInsertDeleteBuffer: make([]bool, 0),
validSearchBuffer: make([]bool, 0),
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
@ -141,14 +174,13 @@ func (node *QueryNode) DeleteCollection(collection *Collection) {
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) PrepareBatchMsg() []int {
var msgLen = node.messageClient.PrepareBatchMsg()
var msgLen= node.messageClient.PrepareBatchMsg()
return msgLen
}
func (node *QueryNode) StartMessageClient() {
func (node *QueryNode) StartMessageClient(pulsarURL string) {
// TODO: add consumerMsgSchema
node.messageClient.InitClient("pulsar://192.168.2.28:6650")
node.messageClient.InitClient(pulsarURL)
go node.messageClient.ReceiveMessage()
}
@ -164,122 +196,124 @@ func (node *QueryNode) InitQueryNodeCollection() {
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) RunInsertDelete() {
var count = 0
var start time.Time
func (node *QueryNode) RunInsertDelete(wg * sync.WaitGroup) {
for {
//time.Sleep(2 * 1000 * time.Millisecond)
node.QueryNodeDataInit()
// TODO: get timeRange from message client
var timeRange = TimeRange{0, 0}
var msgLen = node.PrepareBatchMsg()
//fmt.Println("PrepareBatchMsg Done, Insert len = ", msgLen[0])
if msgLen[0] == 0 {
//fmt.Println("0 msg found")
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
if msgLen[1] == 0 {
continue
}
if count == 0 {
start = time.Now()
}
count+=msgLen[0]
node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done")
fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
node.PreInsertAndDelete()
//fmt.Println("PreInsertAndDelete Done")
fmt.Println("PreInsertAndDelete Done")
node.DoInsertAndDelete()
//fmt.Println("DoInsertAndDelete Done")
fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
//fmt.Print("UpdateSearchTimeSync Done\n\n\n")
if count == 100000 - 1 {
elapsed := time.Since(start)
fmt.Println("Query node insert 10 × 10000 time:", elapsed)
}
}
wg.Done()
}
func (node *QueryNode) RunSearch() {
func (node *QueryNode) TestInsertDelete(timeRange TimeRange) {
node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
node.PreInsertAndDelete()
fmt.Println("PreInsertAndDelete Done")
node.DoInsertAndDelete()
fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
fmt.Print("UpdateSearchTimeSync Done\n\n\n")
}
func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
for {
//time.Sleep(2 * 1000 * time.Millisecond)
start := time.Now()
if len(node.messageClient.GetSearchChan()) <= 0 {
//fmt.Println("null Search")
continue
msg, ok := <-node.messageClient.GetSearchChan()
if ok {
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
fmt.Println("Do Search...")
node.Search(node.messageClient.SearchMsg)
}
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
msg := <-node.messageClient.GetSearchChan()
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
fmt.Println("Do Search...")
node.Search(node.messageClient.SearchMsg)
elapsed := time.Since(start)
fmt.Println("Query node search time:", elapsed)
}
wg.Done()
}
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOrDeleteMsg, timeRange TimeRange) msgPb.Status {
//var tMax = timeRange.timestampMax
var tMax = timeRange.timestampMax
// 1. Extract messages before readTimeSync from QueryNodeDataBuffer.
// Set valid bitmap to false.
for i, msg := range node.buffer.InsertDeleteBuffer {
//if msg.Timestamp < tMax {
if msg.Op == msgPb.OpType_INSERT {
if msg.RowsData == nil {
continue
if msg.Timestamp < tMax {
if msg.Op == msgPb.OpType_INSERT {
if msg.RowsData == nil {
continue
}
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord{
entityID: msg.Uid,
timestamp: msg.Timestamp,
}
node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r)
atomic.AddInt32(&node.deletePreprocessData.count, 1)
}
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord{
entityID: msg.Uid,
timestamp: msg.Timestamp,
}
node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r)
atomic.AddInt32(&node.deletePreprocessData.count, 1)
node.buffer.validInsertDeleteBuffer[i] = false
}
node.buffer.validInsertDeleteBuffer[i] = false
//}
}
// 2. Remove invalid messages from buffer.
for i, isValid := range node.buffer.validInsertDeleteBuffer {
if !isValid {
bufferLen := len(node.buffer.validInsertDeleteBuffer)
assert.Equal(nil, len(node.buffer.validInsertDeleteBuffer), len(node.buffer.InsertDeleteBuffer))
for i:= 0; i < bufferLen - 2; i++ {
if !node.buffer.validInsertDeleteBuffer[i] {
copy(node.buffer.InsertDeleteBuffer[i:], node.buffer.InsertDeleteBuffer[i+1:]) // Shift a[i+1:] left one index.
node.buffer.InsertDeleteBuffer[len(node.buffer.InsertDeleteBuffer)-1] = nil // Erase last element (write zero value).
node.buffer.InsertDeleteBuffer = node.buffer.InsertDeleteBuffer[:len(node.buffer.InsertDeleteBuffer)-1] // Truncate slice.
}
}
node.buffer.validInsertDeleteBuffer = node.buffer.validInsertDeleteBuffer[:len(node.buffer.InsertDeleteBuffer)]
for i := range node.buffer.validInsertDeleteBuffer {
node.buffer.validInsertDeleteBuffer[i] = true
}
// 3. Extract messages before readTimeSync from current messageClient.
// Move massages after readTimeSync to QueryNodeDataBuffer.
// Set valid bitmap to true.
for _, msg := range insertDeleteMessages {
//if msg.Timestamp < tMax {
if msg.Op == msgPb.OpType_INSERT {
if msg.RowsData == nil {
continue
if msg.Timestamp < tMax {
if msg.Op == msgPb.OpType_INSERT {
if msg.RowsData == nil {
continue
}
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord{
entityID: msg.Uid,
timestamp: msg.Timestamp,
}
node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r)
atomic.AddInt32(&node.deletePreprocessData.count, 1)
}
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord{
entityID: msg.Uid,
timestamp: msg.Timestamp,
}
node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r)
atomic.AddInt32(&node.deletePreprocessData.count, 1)
} else {
fmt.Println("msg timestamp:= ", msg.Timestamp>>18)
node.buffer.InsertDeleteBuffer = append(node.buffer.InsertDeleteBuffer, msg)
node.buffer.validInsertDeleteBuffer = append(node.buffer.validInsertDeleteBuffer, true)
}
//} else {
// node.buffer.InsertDeleteBuffer = append(node.buffer.InsertDeleteBuffer, msg)
// node.buffer.validInsertDeleteBuffer = append(node.buffer.validInsertDeleteBuffer, true)
//}
}
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
@ -364,6 +398,7 @@ func (node *QueryNode) DoInsertAndDelete() msgPb.Status {
}
func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.WaitGroup) msgPb.Status {
fmt.Println("Doing insert..., len = ", len(node.insertData.insertIDs[segmentID]))
var targetSegment, err = node.GetSegmentBySegmentID(segmentID)
if err != nil {
fmt.Println(err.Error())
@ -427,17 +462,20 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
var timestamp = msg.Timestamp
var vector = msg.Records
// We now only the first Json is valid.
var queryJson = msg.Json[0]
// 1. Timestamp check
// TODO: return or wait? Or adding graceful time
//if timestamp > node.queryNodeTimeSync.SearchTimeSync {
// return msgPb.Status{ErrorCode: 1}
//}
if timestamp > node.queryNodeTimeSync.SearchTimeSync {
fmt.Println("Invalid query time, timestamp = ", timestamp, ", SearchTimeSync = ", node.queryNodeTimeSync.SearchTimeSync)
return msgPb.Status{ErrorCode: 1}
}
// 2. Do search in all segments
for _, partition := range targetCollection.Partitions {
for _, openSegment := range partition.OpenedSegments {
var res, err = openSegment.SegmentSearch("", timestamp, vector)
var res, err = openSegment.SegmentSearch(queryJson, timestamp, vector)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
@ -448,7 +486,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
}
}
for _, closedSegment := range partition.ClosedSegments {
var res, err = closedSegment.SegmentSearch("", timestamp, vector)
var res, err = closedSegment.SegmentSearch(queryJson, timestamp, vector)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
@ -468,6 +506,9 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
Ids: make([]int64, 0),
}
var results = msgPb.QueryResult{
Status: &msgPb.Status{
ErrorCode: 0,
},
Entities: &entities,
Distances: make([]float32, 0),
QueryId: msg.Uid,

View File

@ -1,11 +1,21 @@
package reader
func startQueryNode() {
qn := NewQueryNode(0, 0)
qn.InitQueryNodeCollection()
//go qn.SegmentService()
qn.StartMessageClient()
import (
"github.com/czs007/suvlim/reader/message_client"
"sync"
)
go qn.RunSearch()
qn.RunInsertDelete()
func StartQueryNode(pulsarURL string) {
mc := message_client.MessageClient{}
mc.InitClient(pulsarURL)
mc.ReceiveMessage()
qn := CreateQueryNode(0, 0, &mc)
qn.InitQueryNodeCollection()
wg := sync.WaitGroup{}
wg.Add(2)
go qn.RunInsertDelete(&wg)
go qn.RunSearch(&wg)
wg.Wait()
qn.Close()
}

View File

@ -5,5 +5,7 @@ import (
)
func TestReader_startQueryNode(t *testing.T) {
startQueryNode()
pulsarURL := "pulsar://localhost:6650"
StartQueryNode(pulsarURL)
}

View File

@ -10,8 +10,8 @@ import (
type ResultEntityIds []int64
type SearchResult struct {
ResultIds []int64
ResultDistances []float32
ResultIds []int64
ResultDistances []float32
}
func getResultTopicByClientId(clientId int64) string {
@ -28,6 +28,20 @@ func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult, clientId
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
func (node *QueryNode) PublishFailedSearchResult() msgPb.Status {
var results = msgPb.QueryResult{
Status: &msgPb.Status{
ErrorCode: 1,
Reason: "Search Failed",
},
}
var ctx = context.Background()
node.messageClient.Send(ctx, results)
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
func (node *QueryNode) PublicStatistic(statisticTopic string) msgPb.Status {
// TODO: get statistic info
// getStatisticInfo()

View File

@ -164,12 +164,14 @@ func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]
return nil
}
func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorRecord *schema.VectorRowRecord) (*SearchResult, error) {
func (s *Segment) SegmentSearch(queryJson string, timestamp uint64, vectorRecord *schema.VectorRowRecord) (*SearchResult, error) {
/*C.Search
int
Search(CSegmentBase c_segment,
void* fake_query,
const char* query_json,
unsigned long timestamp,
float* query_raw_data,
int num_of_query_raw_data,
long int* result_ids,
float* result_distances);
*/
@ -179,12 +181,23 @@ func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorReco
resultIds := make([]int64, TopK)
resultDistances := make([]float32, TopK)
var cQueryPtr = unsafe.Pointer(nil)
var cQueryJson = C.CString(queryJson)
var cTimestamp = C.ulong(timestamp)
var cResultIds = (*C.long)(&resultIds[0])
var cResultDistances = (*C.float)(&resultDistances[0])
var cQueryRawData *C.float
var cQueryRawDataLength C.int
var status = C.Search(s.SegmentPtr, cQueryPtr, cTimestamp, cResultIds, cResultDistances)
if vectorRecord.BinaryData != nil {
return nil, errors.New("Data of binary type is not supported yet")
} else if len(vectorRecord.FloatData) <= 0 {
return nil, errors.New("Null query vector data")
} else {
cQueryRawData = (*C.float)(&vectorRecord.FloatData[0])
cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData))
}
var status = C.Search(s.SegmentPtr, cQueryJson, cTimestamp, cQueryRawData, cQueryRawDataLength, cResultIds, cResultDistances)
if status != 0 {
return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status)))

View File

@ -3,6 +3,7 @@ package reader
import (
"encoding/binary"
"fmt"
schema "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
"math"
"testing"
@ -131,7 +132,15 @@ func TestSegment_SegmentSearch(t *testing.T) {
assert.NoError(t, err)
// 6. Do search
var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil)
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
var queryRawData = make([]float32, 0)
for i := 0; i < 16; i ++ {
queryRawData = append(queryRawData, float32(i))
}
var vectorRecord = schema.VectorRowRecord {
FloatData: queryRawData,
}
var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[0], &vectorRecord)
assert.NoError(t, searchErr)
fmt.Println(searchRes)

View File

@ -49,7 +49,7 @@ GRPC_INCLUDE=.:.
rm -rf proto-cpp && mkdir -p proto-cpp
PB_FILES=()
GRPC_FILES=("message.proto")
GRPC_FILES=("message.proto" "master.proto")
ALL_FILES=("${PB_FILES[@]}")
ALL_FILES+=("${GRPC_FILES[@]}")

View File

@ -4,10 +4,11 @@ import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
pb "github.com/czs007/suvlim/pkg/message"
pb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/golang/protobuf/proto"
"log"
"sort"
"sync"
)
const ReadStopFlagEnd int64 = 0
@ -24,9 +25,10 @@ type TimeSyncMsg struct {
Timestamp uint64
NumRecorders int64
}
type ReaderTimeSyncOption func(*readerTimeSyncCfg)
type readerTimeSyncCfg struct {
type ReaderTimeSyncOption func(*ReaderTimeSyncCfg)
type ReaderTimeSyncCfg struct {
pulsarClient pulsar.Client
timeSyncConsumer pulsar.Consumer
@ -40,6 +42,7 @@ type readerTimeSyncCfg struct {
interval int
proxyIdList []int64
readerQueueSize int
TestData int
revTimesyncFromReader map[uint64]int
@ -81,7 +84,7 @@ func NewReaderTimeSync(
return nil, fmt.Errorf("there are two proxies have the same id = %d", proxyIdList[i])
}
}
r := &readerTimeSyncCfg{
r := &ReaderTimeSyncCfg{
interval: interval,
proxyIdList: proxyIdList,
}
@ -147,7 +150,7 @@ func NewReaderTimeSync(
return r, nil
}
func (r *readerTimeSyncCfg) Close() {
func (r *ReaderTimeSyncCfg) Close() {
r.cancel()
r.timeSyncConsumer.Close()
r.readerConsumer.Close()
@ -157,25 +160,29 @@ func (r *readerTimeSyncCfg) Close() {
r.pulsarClient.Close()
}
func (r *readerTimeSyncCfg) Start() error {
func (r *ReaderTimeSyncCfg) Start() error {
go r.startReadTopics()
go r.startTimeSync()
return r.ctx.Err()
}
func (r *readerTimeSyncCfg) InsertOrDelete() <-chan *pb.InsertOrDeleteMsg {
func (r *ReaderTimeSyncCfg) InsertOrDelete() <-chan *pb.InsertOrDeleteMsg {
return r.insertOrDeleteChan
}
func (r *readerTimeSyncCfg) TimeSync() <-chan TimeSyncMsg {
func (r *ReaderTimeSyncCfg) TimeSync() <-chan TimeSyncMsg {
return r.timesyncMsgChan
}
func (r *readerTimeSyncCfg) IsInsertDeleteChanFull() bool {
func (r *ReaderTimeSyncCfg) TimeSyncChanLen() int {
return len(r.timesyncMsgChan)
}
func (r *ReaderTimeSyncCfg) IsInsertDeleteChanFull() bool {
return len(r.insertOrDeleteChan) == len(r.readerProducer)*r.readerQueueSize
}
func (r *readerTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMsg {
func (r *ReaderTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMsg {
if len(r.proxyIdList) > 1 {
if len(ts) > 1 {
for i := 1; i < len(r.proxyIdList); i++ {
@ -204,7 +211,7 @@ func (r *readerTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMs
return ts
}
func (r *readerTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncMsg, n int) ([]*pb.TimeSyncMsg, error) {
func (r *ReaderTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncMsg, n int) ([]*pb.TimeSyncMsg, error) {
for i := 0; i < n; i++ {
select {
case <-ctx.Done():
@ -213,11 +220,13 @@ func (r *readerTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncM
if ok == false {
return nil, fmt.Errorf("timesync consumer closed")
}
msg := cm.Message
var tsm pb.TimeSyncMsg
if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil {
return nil, err
}
ts = append(ts, &tsm)
r.timeSyncConsumer.AckID(msg.ID())
}
@ -225,11 +234,20 @@ func (r *readerTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncM
return ts, nil
}
func (r *readerTimeSyncCfg) startTimeSync() {
func (r *ReaderTimeSyncCfg) sendEOFMsg(ctx context.Context, msg *pulsar.ProducerMessage, index int, wg *sync.WaitGroup) {
if _, err := r.readerProducer[index].Send(ctx, msg); err != nil {
//TODO, log error
log.Printf("Send timesync flag error %v", err)
}
wg.Done()
}
func (r *ReaderTimeSyncCfg) startTimeSync() {
tsm := make([]*pb.TimeSyncMsg, 0, len(r.proxyIdList)*2)
ctx, _ := context.WithCancel(r.ctx)
var err error
for {
//var start time.Time
for len(tsm) != len(r.proxyIdList) {
tsm = r.alignTimeSync(tsm)
tsm, err = r.readTimeSync(ctx, tsm, len(r.proxyIdList)-len(tsm))
@ -256,21 +274,21 @@ func (r *readerTimeSyncCfg) startTimeSync() {
//TODO log error
log.Printf("Marshal timesync flag error %v", err)
} else {
for _, p := range r.readerProducer {
if _, err := p.Send(ctx, &pulsar.ProducerMessage{Payload: payload}); err != nil {
//TODO, log error
log.Printf("Send timesync flag error %v", err)
}
wg := sync.WaitGroup{}
wg.Add(len(r.readerProducer))
for index := range r.readerProducer {
go r.sendEOFMsg(ctx, &pulsar.ProducerMessage{Payload: payload}, index, &wg)
}
wg.Wait()
}
}
}
func (r *readerTimeSyncCfg) isReadStopFlag(imsg *pb.InsertOrDeleteMsg) bool {
func (r *ReaderTimeSyncCfg) isReadStopFlag(imsg *pb.InsertOrDeleteMsg) bool {
return imsg.ClientId < ReadStopFlagEnd
}
func (r *readerTimeSyncCfg) startReadTopics() {
func (r *ReaderTimeSyncCfg) startReadTopics() {
ctx, _ := context.WithCancel(r.ctx)
tsm := TimeSyncMsg{Timestamp: 0, NumRecorders: 0}
for {
@ -317,7 +335,7 @@ func (r *readerTimeSyncCfg) startReadTopics() {
}
func WithReaderQueueSize(size int) ReaderTimeSyncOption {
return func(r *readerTimeSyncCfg) {
return func(r *ReaderTimeSyncCfg) {
r.readerQueueSize = size
}
}

View File

@ -3,7 +3,7 @@ package readertimesync
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
pb "github.com/czs007/suvlim/pkg/message"
pb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/golang/protobuf/proto"
"log"
"sync"
@ -44,7 +44,7 @@ const (
)
func TestAlignTimeSync(t *testing.T) {
r := &readerTimeSyncCfg{
r := &ReaderTimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
interval: 200,
}
@ -75,7 +75,7 @@ func TestAlignTimeSync(t *testing.T) {
}
func TestAlignTimeSync2(t *testing.T) {
r := &readerTimeSyncCfg{
r := &ReaderTimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
interval: 200,
}
@ -104,7 +104,7 @@ func TestAlignTimeSync2(t *testing.T) {
}
func TestAlignTimeSync3(t *testing.T) {
r := &readerTimeSyncCfg{
r := &ReaderTimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
interval: 200,
}
@ -142,7 +142,7 @@ func TestAlignTimeSync3(t *testing.T) {
}
func TestAlignTimeSync4(t *testing.T) {
r := &readerTimeSyncCfg{
r := &ReaderTimeSyncCfg{
proxyIdList: []int64{1},
interval: 200,
}
@ -173,7 +173,7 @@ func TestAlignTimeSync4(t *testing.T) {
}
func TestAlignTimeSync5(t *testing.T) {
r := &readerTimeSyncCfg{
r := &ReaderTimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
interval: 200,
}
@ -219,7 +219,7 @@ func TestNewReaderTimeSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rr := r.(*readerTimeSyncCfg)
rr := r.(*ReaderTimeSyncCfg)
if rr.pulsarClient == nil {
t.Fatalf("create pulsar client failed")
}
@ -303,7 +303,7 @@ func TestReaderTimesync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rr := r.(*readerTimeSyncCfg)
rr := r.(*ReaderTimeSyncCfg)
pt1, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: timeSyncTopic})
if err != nil {
t.Fatalf("create time sync producer 1 error %v", err)

View File

@ -3,14 +3,13 @@ package main
import (
"context"
"fmt"
"log"
"sync"
"strconv"
"github.com/czs007/suvlim/conf"
storage "github.com/czs007/suvlim/storage/pkg"
"github.com/czs007/suvlim/writer/message_client"
"github.com/czs007/suvlim/writer/write_node"
"time"
"log"
"strconv"
"sync"
)
func main() {
@ -20,15 +19,15 @@ func main() {
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
println(pulsarAddr)
mc := message_client.MessageClient{}
mc.InitClient(pulsarAddr)
//TODO::close client / consumer/ producer
//mc.Close()
go mc.ReceiveMessage()
mc.ReceiveMessage()
wg := sync.WaitGroup{}
ctx := context.Background()
kv, err := storage.NewStore(ctx, conf.Config.Storage.Driver)
// if err != nil, should retry link
// TODO:: if err != nil, should retry link
if err != nil {
log.Fatal(err)
}
@ -39,19 +38,17 @@ func main() {
TimeSync: 100,
}
//TODO:: start a gorouter for searchById
for {
time.Sleep(200 * time.Millisecond)
if ctx.Err() != nil {
break
}
msgLength := wn.MessageClient.PrepareBatchMsg()
readyDo := false
for _, len := range msgLength {
if len > 0 {
readyDo = true
}
if msgLength > 0 {
wn.DoWriteNode(ctx, &wg)
fmt.Println("write node do a batch message, storage len: ", msgLength)
}
if readyDo {
wn.DoWriteNode(ctx, 100, &wg)
fmt.Println("write node do a batch message, storage len: ")
}
fmt.Println("do a batch in 200ms")
}
wn.Close()
}

View File

@ -4,29 +4,30 @@ import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
msgpb "github.com/czs007/suvlim/pkg/master/grpc/message"
timesync "github.com/czs007/suvlim/timesync"
"github.com/golang/protobuf/proto"
"log"
"strconv"
)
type MessageClient struct {
// timesync
timeSyncCfg *timesync.ReaderTimeSyncCfg
//message channel
insertOrDeleteChan chan *msgpb.InsertOrDeleteMsg
searchByIdChan chan *msgpb.EntityIdentity
timeSyncChan chan *msgpb.TimeSyncMsg
searchByIdChan chan *msgpb.EntityIdentity
// pulsar
client pulsar.Client
key2segProducer pulsar.Producer
insertOrDeleteConsumer pulsar.Consumer
searchByIdConsumer pulsar.Consumer
timeSyncConsumer pulsar.Consumer
client pulsar.Client
key2segProducer pulsar.Producer
searchByIdConsumer pulsar.Consumer
// batch messages
InsertMsg []*msgpb.InsertOrDeleteMsg
DeleteMsg []*msgpb.InsertOrDeleteMsg
SearchByIdMsg []*msgpb.EntityIdentity
TimeSyncMsg []*msgpb.TimeSyncMsg
InsertMsg []*msgpb.InsertOrDeleteMsg
DeleteMsg []*msgpb.InsertOrDeleteMsg
timestampBatchStart uint64
timestampBatchEnd uint64
batchIDLen int
}
func (mc *MessageClient) Send(ctx context.Context, msg msgpb.Key2SegMsg) {
@ -38,20 +39,15 @@ func (mc *MessageClient) Send(ctx context.Context, msg msgpb.Key2SegMsg) {
}
}
func (mc *MessageClient) ReceiveInsertOrDeleteMsg() {
for {
insetOrDeleteMsg := msgpb.InsertOrDeleteMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &insetOrDeleteMsg)
if err != nil {
log.Fatal(err)
}
mc.insertOrDeleteChan <- &insetOrDeleteMsg
mc.insertOrDeleteConsumer.Ack(msg)
}
func (mc *MessageClient) TimeSync() uint64 {
return mc.timestampBatchEnd
}
func (mc *MessageClient) ReceiveSearchByIdMsg() {
func (mc *MessageClient) SearchByIdChan() chan *msgpb.EntityIdentity {
return mc.searchByIdChan
}
func (mc *MessageClient) receiveSearchByIdMsg() {
for {
searchByIdMsg := msgpb.EntityIdentity{}
msg, err := mc.searchByIdConsumer.Receive(context.Background())
@ -64,26 +60,16 @@ func (mc *MessageClient) ReceiveSearchByIdMsg() {
}
}
func (mc *MessageClient) ReceiveTimeSyncMsg() {
for {
timeSyncMsg := msgpb.TimeSyncMsg{}
msg, err := mc.timeSyncConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &timeSyncMsg)
if err != nil {
log.Fatal(err)
}
mc.timeSyncChan <- &timeSyncMsg
mc.timeSyncConsumer.Ack(msg)
}
}
func (mc *MessageClient) ReceiveMessage() {
go mc.ReceiveInsertOrDeleteMsg()
go mc.ReceiveSearchByIdMsg()
go mc.ReceiveTimeSyncMsg()
err := mc.timeSyncCfg.Start()
if err != nil {
log.Fatal(err)
}
go mc.receiveSearchByIdMsg()
}
func (mc *MessageClient) CreatProducer(topicName string) pulsar.Producer {
func (mc *MessageClient) creatProducer(topicName string) pulsar.Producer {
producer, err := mc.client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
})
@ -94,10 +80,10 @@ func (mc *MessageClient) CreatProducer(topicName string) pulsar.Producer {
return producer
}
func (mc *MessageClient) CreateConsumer(topicName string) pulsar.Consumer {
func (mc *MessageClient) createConsumer(topicName string) pulsar.Consumer {
consumer, err := mc.client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: "multi-topic-sub",
SubscriptionName: "writer",
})
if err != nil {
@ -106,7 +92,7 @@ func (mc *MessageClient) CreateConsumer(topicName string) pulsar.Consumer {
return consumer
}
func (mc *MessageClient) CreateClient(url string) pulsar.Client {
func (mc *MessageClient) createClient(url string) pulsar.Client {
// create client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: url,
@ -120,42 +106,51 @@ func (mc *MessageClient) CreateClient(url string) pulsar.Client {
func (mc *MessageClient) InitClient(url string) {
//create client
mc.client = mc.CreateClient(url)
mc.client = mc.createClient(url)
//create producer
mc.key2segProducer = mc.CreatProducer("Key2Seg")
mc.key2segProducer = mc.creatProducer("Key2Seg")
//create consumer
mc.insertOrDeleteConsumer = mc.CreateConsumer("InsertOrDelete")
mc.searchByIdConsumer = mc.CreateConsumer("SearchById")
mc.timeSyncConsumer = mc.CreateConsumer("TimeSync")
mc.searchByIdConsumer = mc.createConsumer("SearchById")
// init channel
mc.insertOrDeleteChan = make(chan *msgpb.InsertOrDeleteMsg, 1000)
mc.searchByIdChan = make(chan *msgpb.EntityIdentity, 1000)
mc.timeSyncChan = make(chan *msgpb.TimeSyncMsg, 1000)
//init channel
mc.searchByIdChan = make(chan *msgpb.EntityIdentity, 10000)
mc.InsertMsg = make([]*msgpb.InsertOrDeleteMsg, 1000)
mc.DeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 1000)
mc.SearchByIdMsg = make([]*msgpb.EntityIdentity, 1000)
mc.TimeSyncMsg = make([]*msgpb.TimeSyncMsg, 1000)
//init msg slice
mc.InsertMsg = make([]*msgpb.InsertOrDeleteMsg, 0)
mc.DeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 0)
//init timesync
URL := "pulsar://localhost:6650"
timeSyncTopic := "TimeSync"
timeSyncSubName := "writer"
readTopics := make([]string, 0, 1024)
for i := 0; i < 1024; i++ {
str := "InsertOrDelete-partition-"
str = str + strconv.Itoa(i)
readTopics = append(readTopics, str)
}
readSubName := "writer"
proxyIdList := []int64{0}
timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -1, timesync.WithReaderQueueSize(1024))
if err != nil {
log.Fatal(err)
}
mc.timeSyncCfg = timeSync.(*timesync.ReaderTimeSyncCfg)
mc.timestampBatchStart = 0
mc.timestampBatchEnd = 0
mc.batchIDLen = 0
}
func (mc *MessageClient) Close() {
defer mc.client.Close()
defer mc.key2segProducer.Close()
defer mc.insertOrDeleteConsumer.Close()
defer mc.searchByIdConsumer.Close()
defer mc.timeSyncConsumer.Close()
mc.client.Close()
mc.key2segProducer.Close()
mc.searchByIdConsumer.Close()
mc.timeSyncCfg.Close()
}
type JobType int
const (
OpInQueryNode JobType = 0
OpInWriteNode JobType = 1
)
type MessageType int
const (
@ -171,42 +166,40 @@ func (mc *MessageClient) PrepareMsg(messageType MessageType, msgLen int) {
switch messageType {
case InsertOrDelete:
for i := 0; i < msgLen; i++ {
msg := <-mc.insertOrDeleteChan
msg := <-mc.timeSyncCfg.InsertOrDelete()
if msg.Op == msgpb.OpType_INSERT {
mc.InsertMsg = append(mc.InsertMsg, msg)
} else {
mc.DeleteMsg = append(mc.DeleteMsg, msg)
}
}
case SearchById:
for i := 0; i < msgLen; i++ {
msg := <-mc.searchByIdChan
mc.SearchByIdMsg = append(mc.SearchByIdMsg, msg)
}
case TimeSync:
mc.timestampBatchStart = mc.timestampBatchEnd
mc.batchIDLen = 0
for i := 0; i < msgLen; i++ {
msg := <-mc.timeSyncChan
mc.TimeSyncMsg = append(mc.TimeSyncMsg, msg)
msg := <-mc.timeSyncCfg.TimeSync()
if i == msgLen-1 {
mc.timestampBatchEnd = msg.Timestamp
}
mc.batchIDLen += int(msg.NumRecorders)
}
}
}
func (mc *MessageClient) PrepareBatchMsg() []int {
func (mc *MessageClient) PrepareBatchMsg() int {
// assume the channel not full
mc.InsertMsg = mc.InsertMsg[:0]
mc.DeleteMsg = mc.DeleteMsg[:0]
mc.SearchByIdMsg = mc.SearchByIdMsg[:0]
mc.TimeSyncMsg = mc.TimeSyncMsg[:0]
mc.batchIDLen = 0
// get the length of every channel
insertOrDeleteLen := len(mc.insertOrDeleteChan)
searchLen := len(mc.searchByIdChan)
timeLen := len(mc.timeSyncChan)
timeLen := len(mc.timeSyncCfg.TimeSync())
// get message from channel to slice
mc.PrepareMsg(InsertOrDelete, insertOrDeleteLen)
mc.PrepareMsg(SearchById, searchLen)
mc.PrepareMsg(TimeSync, timeLen)
return []int{insertOrDeleteLen, searchLen, timeLen}
if timeLen > 0 {
mc.PrepareMsg(TimeSync, timeLen)
mc.PrepareMsg(InsertOrDelete, mc.batchIDLen)
}
//return []int{insertOrDeleteLen, searchLen, timeLen}
return mc.batchIDLen
}

View File

@ -17,12 +17,17 @@ type SegmentIdInfo struct {
SegmentIds []string
}
type WriteNode struct {
KvStore *types.Store
MessageClient *message_client.MessageClient
TimeSync uint64
}
func (wn *WriteNode) Close() {
wn.MessageClient.Close()
}
func NewWriteNode(ctx context.Context,
address string,
topics []string,
@ -104,10 +109,10 @@ func (wn *WriteNode) UpdateTimeSync(timeSync uint64) {
wn.TimeSync = timeSync
}
func (wn *WriteNode) DoWriteNode(ctx context.Context, timeSync uint64, wg *sync.WaitGroup) {
func (wn *WriteNode) DoWriteNode(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(2)
go wn.InsertBatchData(ctx, wn.MessageClient.InsertMsg, wg)
go wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg, wg)
wg.Wait()
wn.UpdateTimeSync(timeSync)
wn.UpdateTimeSync(wn.MessageClient.TimeSync())
}