From bfd4fe5a828fa9c9fd8362a6d5ec00d143e983e5 Mon Sep 17 00:00:00 2001 From: shengjh <1572099106@qq.com> Date: Sat, 5 Sep 2020 19:17:08 +0800 Subject: [PATCH] Update the logic of insert and delete Signed-off-by: shengjh <1572099106@qq.com> --- .gitignore | 4 - .../src/pulsar/message_client/CMakeLists.txt | 1 - proxy/src/pulsar/message_client/ClientV2.cpp | 149 +- proxy/src/pulsar/message_client/ClientV2.h | 56 +- proxy/src/pulsar/message_client/Consumer.cpp | 2 +- proxy/src/pulsar/message_client/Producer.h | 2 +- proxy/src/server/CMakeLists.txt | 4 +- proxy/src/server/Server.cpp | 11 +- proxy/src/server/delivery/ReqHandler.cpp | 10 +- proxy/src/server/delivery/ReqHandler.h | 5 +- proxy/src/server/delivery/ReqScheduler.cpp | 12 + proxy/src/server/delivery/ReqScheduler.h | 7 + proxy/src/server/delivery/request/BaseReq.cpp | 4 + proxy/src/server/delivery/request/BaseReq.h | 6 + .../delivery/request/DeleteEntityByIDReq.cpp | 19 +- .../delivery/request/DeleteEntityByIDReq.h | 9 +- .../src/server/delivery/request/InsertReq.cpp | 25 +- proxy/src/server/delivery/request/InsertReq.h | 20 +- .../server/grpc_impl/GrpcRequestHandler.cpp | 1241 +++++++++-------- .../src/server/grpc_impl/GrpcRequestHandler.h | 3 - proxy/src/server/grpc_impl/GrpcServer.cpp | 18 +- proxy/src/server/timesync/TimeSync.cpp | 67 - proxy/src/server/timesync/TimeSync.h | 45 - proxy/src/server/tso/TSO.cpp | 36 + proxy/src/server/tso/TSO.h | 29 + 25 files changed, 892 insertions(+), 893 deletions(-) delete mode 100644 proxy/src/server/timesync/TimeSync.cpp delete mode 100644 proxy/src/server/timesync/TimeSync.h create mode 100644 proxy/src/server/tso/TSO.cpp create mode 100644 proxy/src/server/tso/TSO.h diff --git a/.gitignore b/.gitignore index fa3661548b..54d598a1f9 100644 --- a/.gitignore +++ b/.gitignore @@ -27,10 +27,6 @@ proxy/cmake_build proxy/cmake-build-debug proxy/thirdparty/grpc-src proxy/thirdparty/grpc-build -proxy/milvus/ -proxy/milvus/* -proxy/suvlim/ -proxy/suvlim/* # Compiled source *.a diff --git a/proxy/src/pulsar/message_client/CMakeLists.txt b/proxy/src/pulsar/message_client/CMakeLists.txt index 7b27ef03f2..9a8b9b0829 100644 --- a/proxy/src/pulsar/message_client/CMakeLists.txt +++ b/proxy/src/pulsar/message_client/CMakeLists.txt @@ -3,7 +3,6 @@ set(src-cpp ClientV2.cpp Consumer.cpp Producer.cpp - ${PROJECT_SOURCE_DIR}/src/grpc/gen-milvus/suvlim.pb.cc ) add_library(message_client_cpp SHARED diff --git a/proxy/src/pulsar/message_client/ClientV2.cpp b/proxy/src/pulsar/message_client/ClientV2.cpp index 59190cac83..b12a13eb58 100644 --- a/proxy/src/pulsar/message_client/ClientV2.cpp +++ b/proxy/src/pulsar/message_client/ClientV2.cpp @@ -1,87 +1,92 @@ #include "ClientV2.h" #include "pulsar/Result.h" -namespace milvus { - namespace { - int64_t gen_channe_id(int64_t uid) { - // TODO: murmur3 hash from pulsar source code - return 0; - } - } +namespace { +int64_t gen_channe_id(int64_t uid) { + // TODO: murmur3 hash from pulsar source code + return 0; +} } namespace milvus::message_client { - MsgClientV2::MsgClientV2(int64_t client_id, std::string &service_url, const pulsar::ClientConfiguration &config) - : client_id_(client_id), service_url_(service_url) {} +MsgClientV2 &MsgClientV2::GetInstance() { + // TODO: do not hardcode pulsar message configure and init + std::string pulsar_server_addr = "pulsar://localhost:6650"; + int64_t client_id = 0; + static MsgClientV2 msg_client(client_id, pulsar_server_addr); + return msg_client; +} - Status MsgClientV2::Init(const std::string &mut_topic, const std::string &query_topic, - const std::string &result_topic) { - auto pulsar_client = std::make_shared(service_url_); - mut_producer_ = std::make_shared(pulsar_client, mut_topic); - query_producer_ = std::make_shared(pulsar_client, mut_topic); - consumer_ = std::make_shared(pulsar_client, "default"); +MsgClientV2::MsgClientV2(int64_t client_id, std::string &service_url, const pulsar::ClientConfiguration &config) + : client_id_(client_id), service_url_(service_url) {} - auto result = consumer_->subscribe(result_topic); - if (result != pulsar::Result::ResultOk) { - return Status(SERVER_UNEXPECTED_ERROR, pulsar::strResult(result)); - } - return Status::OK(); +Status MsgClientV2::Init(const std::string &mut_topic, const std::string &query_topic, + const std::string &result_topic) { + auto pulsar_client = std::make_shared(service_url_); + mut_producer_ = std::make_shared(pulsar_client, mut_topic); + query_producer_ = std::make_shared(pulsar_client, mut_topic); + consumer_ = std::make_shared(pulsar_client, result_topic); + + auto result = consumer_->subscribe(result_topic); + if (result != pulsar::Result::ResultOk) { + return Status(SERVER_UNEXPECTED_ERROR, "Pulsar message client init occur error, " + std::string(pulsar::strResult(result))); + } + return Status::OK(); +} + +void MsgClientV2::GetQueryResult(int64_t query_id) { + throw std::exception(); +} + +Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) { + // may have retry policy? + auto row_count = request.rows_data_size(); + // TODO: Get the segment from master + int64_t segment = 0; + milvus::grpc::InsertOrDeleteMsg mut_msg; + for (auto i = 0; i < row_count; i++) { + mut_msg.set_op(milvus::grpc::OpType::INSERT); + mut_msg.set_uid(GetUniqueQId()); + mut_msg.set_client_id(client_id_); + auto channel_id = gen_channe_id(request.entity_id_array(i)); + mut_msg.set_channel_id(channel_id); + mut_msg.set_collection_name(request.collection_name()); + mut_msg.set_partition_tag(request.partition_tag()); + mut_msg.set_segment_id(segment); + mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); + mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); + + auto result = mut_producer_->send(mut_msg); + if (result != pulsar::ResultOk) { + // TODO: error code + return Status(DB_ERROR, pulsar::strResult(result)); } + } + return Status::OK(); +} +Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request) { + milvus::grpc::InsertOrDeleteMsg mut_msg; + for (auto id: request.id_array()) { + mut_msg.set_op(milvus::grpc::OpType::DELETE); + mut_msg.set_uid(GetUniqueQId()); + mut_msg.set_client_id(client_id_); + mut_msg.set_uid(id); + mut_msg.set_collection_name(request.collection_name()); - void MsgClientV2::GetQueryResult(int64_t query_id) { - throw std::exception(); + auto result = mut_producer_->send(mut_msg); + if (result != pulsar::ResultOk) { + // TODO: error code + return Status(DB_ERROR, pulsar::strResult(result)); } + } + return Status::OK(); +} - Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) { - // may have retry policy? - auto row_count = request.rows_data_size(); - // TODO: Get the segment from master - int64_t segment = 0; - milvus::grpc::InsertOrDeleteMsg mut_msg; - for (auto i = 0; i < row_count; i++) { - mut_msg.set_op(milvus::grpc::OpType::INSERT); - mut_msg.set_uid(GetUniqueQId()); - mut_msg.set_client_id(client_id_); - auto channel_id = gen_channe_id(request.entity_id_array(i)); - mut_msg.set_channel_id(channel_id); - mut_msg.set_collection_name(request.collection_name()); - mut_msg.set_partition_tag(request.partition_tag()); - mut_msg.set_segment_id(segment); - mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); - mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); - - auto result = mut_producer_->send(mut_msg.SerializeAsString()); - if (result != pulsar::ResultOk) { - // TODO: error code - return Status(DB_ERROR, pulsar::strResult(result)); - } - } - return Status::OK(); - } - - Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request) { - milvus::grpc::InsertOrDeleteMsg mut_msg; - for (auto id: request.id_array()) { - mut_msg.set_op(milvus::grpc::OpType::DELETE); - mut_msg.set_uid(GetUniqueQId()); - mut_msg.set_client_id(client_id_); - mut_msg.set_uid(id); - mut_msg.set_collection_name(request.collection_name()); - - auto result = mut_producer_->send(mut_msg.SerializeAsString()); - if (result != pulsar::ResultOk) { - // TODO: error code - return Status(DB_ERROR, pulsar::strResult(result)); - } - } - return Status::OK(); - } - - MsgClientV2::~MsgClientV2() { - mut_producer_->close(); - query_producer_->close(); - consumer_->close(); - } +MsgClientV2::~MsgClientV2() { + mut_producer_->close(); + query_producer_->close(); + consumer_->close(); +} } \ No newline at end of file diff --git a/proxy/src/pulsar/message_client/ClientV2.h b/proxy/src/pulsar/message_client/ClientV2.h index c7912ab794..aa4c7cfb20 100644 --- a/proxy/src/pulsar/message_client/ClientV2.h +++ b/proxy/src/pulsar/message_client/ClientV2.h @@ -1,42 +1,46 @@ #pragma once -#include "utils/Status.h" +#include "src/utils/Status.h" #include "Producer.h" #include "Consumer.h" #include "grpc/gen-milvus/suvlim.pb.h" namespace milvus::message_client { - class MsgClientV2 { - public: - MsgClientV2(int64_t client_id, std::string &service_url, const pulsar::ClientConfiguration& config = pulsar::ClientConfiguration()); +class MsgClientV2 { + public: + static MsgClientV2 &GetInstance(); + ~MsgClientV2(); - ~MsgClientV2(); + // When using MsgClient, make sure it init successfully + Status Init(const std::string &mut_topic, + const std::string &query_topic, const std::string &result_topic); - // When using MsgClient, make sure it init successfully - Status Init(const std::string &mut_topic, - const std::string &query_topic, const std::string &result_topic); + // unpackage batch insert or delete request, and delivery message to pulsar per row + Status SendMutMessage(const milvus::grpc::InsertParam &request); - // unpackage batch insert or delete request, and delivery message to pulsar per row - Status SendMutMessage(const milvus::grpc::InsertParam &request); + Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request); - Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request); + // + Status SendQueryMessage(const milvus::grpc::SearchParam &request); - // - Status SendQueryMessage(const milvus::grpc::SearchParam &request); + void GetQueryResult(int64_t query_id); - void GetQueryResult(int64_t query_id); + private: - private: - int64_t GetUniqueQId() { - return q_id_.fetch_add(1); - } + MsgClientV2(int64_t client_id, + std::string &service_url, + const pulsar::ClientConfiguration &config = pulsar::ClientConfiguration()); - private: - std::atomic q_id_ = 0; - int64_t client_id_; - std::string service_url_; - std::shared_ptr consumer_; - std::shared_ptr mut_producer_; - std::shared_ptr query_producer_; - }; + int64_t GetUniqueQId() { + return q_id_.fetch_add(1); + } + + private: + std::atomic q_id_ = 0; + int64_t client_id_; + std::string service_url_; + std::shared_ptr consumer_; + std::shared_ptr mut_producer_; + std::shared_ptr query_producer_; +}; } \ No newline at end of file diff --git a/proxy/src/pulsar/message_client/Consumer.cpp b/proxy/src/pulsar/message_client/Consumer.cpp index b208e431ef..8c9ea720a3 100644 --- a/proxy/src/pulsar/message_client/Consumer.cpp +++ b/proxy/src/pulsar/message_client/Consumer.cpp @@ -1,6 +1,6 @@ #include "Consumer.h" -#include "grpc/gen-milvus/suvlim.pb.h" +#include "src/grpc/gen-milvus/suvlim.pb.h" namespace milvus { namespace message_client { diff --git a/proxy/src/pulsar/message_client/Producer.h b/proxy/src/pulsar/message_client/Producer.h index 3a357f3590..903c3b54ca 100644 --- a/proxy/src/pulsar/message_client/Producer.h +++ b/proxy/src/pulsar/message_client/Producer.h @@ -2,7 +2,7 @@ #include "pulsar/Producer.h" #include "Client.h" -#include "grpc/gen-milvus/suvlim.pb.h" +#include "src/grpc/gen-milvus/suvlim.pb.h" namespace milvus { namespace message_client { diff --git a/proxy/src/server/CMakeLists.txt b/proxy/src/server/CMakeLists.txt index c3133fbca7..7ca570447e 100644 --- a/proxy/src/server/CMakeLists.txt +++ b/proxy/src/server/CMakeLists.txt @@ -26,7 +26,7 @@ aux_source_directory( ${MILVUS_ENGINE_SRC}/server/init SERVER_INIT_ aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/request DELIVERY_REQUEST_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/strategy DELIVERY_STRATEGY_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery DELIVERY_FILES ) -aux_source_directory( ${MILVUS_ENGINE_SRC}/server/timesync TIME_SYNC_FILES ) +aux_source_directory( ${MILVUS_ENGINE_SRC}/server/tso TSO_FILES) set( SERVER_FILES ${SERVER_INIT_FILES} ${SERVER_SERVICE_FILES} @@ -34,7 +34,7 @@ set( SERVER_FILES ${SERVER_INIT_FILES} ${DELIVERY_REQUEST_FILES} ${DELIVERY_STRATEGY_FILES} ${DELIVERY_FILES} - ${TIME_SYNC_FILES} + ${TSO_FILES} ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/grpc_impl GRPC_IMPL_FILES ) diff --git a/proxy/src/server/Server.cpp b/proxy/src/server/Server.cpp index 2dc38926bf..9874084eac 100644 --- a/proxy/src/server/Server.cpp +++ b/proxy/src/server/Server.cpp @@ -28,6 +28,7 @@ #include "server/init/StorageChecker.h" #include "src/version.h" #include +#include "src/pulsar/message_client/ClientV2.h" #include "utils/Log.h" #include "utils/SignalHandler.h" #include "utils/TimeRecorder.h" @@ -290,11 +291,11 @@ Server::StartService() { grpc::GrpcServer::GetInstance().Start(); - // stat = storage::S3ClientWrapper::GetInstance().StartService(); - // if (!stat.ok()) { - // LOG_SERVER_ERROR_ << "S3Client start service fail: " << stat.message(); - // goto FAIL; - // } + stat = message_client::MsgClientV2::GetInstance().Init("topic-insert","topic-query","topic-result"); + if (!stat.ok()) { + LOG_SERVER_ERROR_ << "Pulsar message client start service fail: " << stat.message(); + goto FAIL; + } return Status::OK(); FAIL: diff --git a/proxy/src/server/delivery/ReqHandler.cpp b/proxy/src/server/delivery/ReqHandler.cpp index 31eb8b3fa9..2f52b7add5 100644 --- a/proxy/src/server/delivery/ReqHandler.cpp +++ b/proxy/src/server/delivery/ReqHandler.cpp @@ -149,9 +149,8 @@ ReqHandler::DropIndex(const ContextPtr& context, const std::string& collection_n } Status -ReqHandler::Insert(const ContextPtr& context, const std::string& collection_name, const std::string& partition_name, - const int64_t& row_count, std::unordered_map>& chunk_data) { - BaseReqPtr req_ptr = InsertReq::Create(context, collection_name, partition_name, row_count, chunk_data); +ReqHandler::Insert(const ContextPtr& context, const ::milvus::grpc::InsertParam* insert_param) { + BaseReqPtr req_ptr = InsertReq::Create(context, insert_param); ReqScheduler::ExecReq(req_ptr); return req_ptr->status(); } @@ -167,9 +166,8 @@ ReqHandler::GetEntityByID(const ContextPtr& context, const std::string& collecti } Status -ReqHandler::DeleteEntityByID(const ContextPtr& context, const std::string& collection_name, - const engine::IDNumbers& ids) { - BaseReqPtr req_ptr = DeleteEntityByIDReq::Create(context, collection_name, ids); +ReqHandler::DeleteEntityByID(const ContextPtr& context, const ::milvus::grpc::DeleteByIDParam *param) { + BaseReqPtr req_ptr = DeleteEntityByIDReq::Create(context, param); ReqScheduler::ExecReq(req_ptr); return req_ptr->status(); } diff --git a/proxy/src/server/delivery/ReqHandler.h b/proxy/src/server/delivery/ReqHandler.h index e51993fa39..f21273f608 100644 --- a/proxy/src/server/delivery/ReqHandler.h +++ b/proxy/src/server/delivery/ReqHandler.h @@ -79,8 +79,7 @@ class ReqHandler { const std::string& index_name); Status - Insert(const ContextPtr& context, const std::string& collection_name, const std::string& partition_name, - const int64_t& row_count, std::unordered_map>& chunk_data); + Insert(const ContextPtr& context, const ::milvus::grpc::InsertParam* insert_param); Status GetEntityByID(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& ids, @@ -88,7 +87,7 @@ class ReqHandler { engine::DataChunkPtr& data_chunk); Status - DeleteEntityByID(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& ids); + DeleteEntityByID(const ContextPtr& context, const ::milvus::grpc::DeleteByIDParam *param); Status Search(const ContextPtr& context, const query::QueryPtr& query_ptr, const milvus::json& json_params, diff --git a/proxy/src/server/delivery/ReqScheduler.cpp b/proxy/src/server/delivery/ReqScheduler.cpp index 618c6de172..3789fcac78 100644 --- a/proxy/src/server/delivery/ReqScheduler.cpp +++ b/proxy/src/server/delivery/ReqScheduler.cpp @@ -11,6 +11,7 @@ #include "server/delivery/ReqScheduler.h" #include "utils/Log.h" +#include "server/tso/TSO.h" #include #include @@ -134,6 +135,9 @@ Status ReqScheduler::PutToQueue(const BaseReqPtr& req_ptr) { std::lock_guard lock(queue_mtx_); + auto &tso = TSOracle::GetInstance(); + req_ptr->SetTimestamp(tso.GetTimeStamp()); + std::string group_name = req_ptr->req_group(); if (req_groups_.count(group_name) > 0) { req_groups_[group_name]->PutReq(req_ptr); @@ -152,5 +156,13 @@ ReqScheduler::PutToQueue(const BaseReqPtr& req_ptr) { return Status::OK(); } +int64_t ReqScheduler::GetLatestReqDeliveredTime() { + return latest_req_time_.load(); +} + +void ReqScheduler::UpdateLatestDeliveredReqTime(int64_t time) { + latest_req_time_.store(time); +} + } // namespace server } // namespace milvus diff --git a/proxy/src/server/delivery/ReqScheduler.h b/proxy/src/server/delivery/ReqScheduler.h index dbcf8418ee..e6f00bb470 100644 --- a/proxy/src/server/delivery/ReqScheduler.h +++ b/proxy/src/server/delivery/ReqScheduler.h @@ -44,6 +44,11 @@ class ReqScheduler { static void ExecReq(const BaseReqPtr& req_ptr); + + void UpdateLatestDeliveredReqTime(int64_t time); + + int64_t GetLatestReqDeliveredTime(); + protected: ReqScheduler(); @@ -58,6 +63,8 @@ class ReqScheduler { private: mutable std::mutex queue_mtx_; + + std::atomic latest_req_time_; std::map req_groups_; diff --git a/proxy/src/server/delivery/request/BaseReq.cpp b/proxy/src/server/delivery/request/BaseReq.cpp index 04460fafe5..4d8a50d595 100644 --- a/proxy/src/server/delivery/request/BaseReq.cpp +++ b/proxy/src/server/delivery/request/BaseReq.cpp @@ -77,5 +77,9 @@ BaseReq::WaitToFinish() { return status_; } +void BaseReq::SetTimestamp(uint64_t ts) { + timestamp_ = ts; +} + } // namespace server } // namespace milvus diff --git a/proxy/src/server/delivery/request/BaseReq.h b/proxy/src/server/delivery/request/BaseReq.h index f0ec7e069e..99f47a673f 100644 --- a/proxy/src/server/delivery/request/BaseReq.h +++ b/proxy/src/server/delivery/request/BaseReq.h @@ -14,6 +14,7 @@ #include "server/context/Context.h" #include "server/delivery/request/Types.h" #include "utils/Status.h" +#include "pulsar/message_client/ClientV2.h" #include #include @@ -74,6 +75,9 @@ class BaseReq { void SetStatus(const Status& status); + void + SetTimestamp(uint64_t ts); + protected: virtual Status OnPreExecute(); @@ -90,6 +94,8 @@ class BaseReq { std::string req_group_; bool async_; Status status_; + uint64_t timestamp_; + private: mutable std::mutex finish_mtx_; diff --git a/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp b/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp index 0f92459806..2e7df2e2a6 100644 --- a/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp +++ b/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp @@ -16,6 +16,7 @@ // under the License. #include "server/delivery/request/DeleteEntityByIDReq.h" +#include "src/server/delivery/ReqScheduler.h" #include #include @@ -29,21 +30,25 @@ namespace milvus { namespace server { -DeleteEntityByIDReq::DeleteEntityByIDReq(const ContextPtr& context, const std::string& collection_name, - const engine::IDNumbers& entity_ids) - : BaseReq(context, ReqType::kDeleteEntityByID), collection_name_(collection_name), entity_ids_(entity_ids) { +DeleteEntityByIDReq::DeleteEntityByIDReq(const ContextPtr& context, const ::milvus::grpc::DeleteByIDParam *request) + : BaseReq(context, ReqType::kDeleteEntityByID), request_(request) { } BaseReqPtr -DeleteEntityByIDReq::Create(const ContextPtr& context, const std::string& collection_name, - const engine::IDNumbers& entity_ids) { - return std::shared_ptr(new DeleteEntityByIDReq(context, collection_name, entity_ids)); +DeleteEntityByIDReq::Create(const ContextPtr& context, const ::milvus::grpc::DeleteByIDParam *request) { + return std::shared_ptr(new DeleteEntityByIDReq(context, request)); } Status DeleteEntityByIDReq::OnExecute() { + auto &msg_client = message_client::MsgClientV2::GetInstance(); + Status status = msg_client.SendMutMessage(*request_); + return status; +} - return Status::OK(); +Status DeleteEntityByIDReq::OnPostExecute() { + ReqScheduler::GetInstance().UpdateLatestDeliveredReqTime(timestamp_); + return Status::OK(); } } // namespace server diff --git a/proxy/src/server/delivery/request/DeleteEntityByIDReq.h b/proxy/src/server/delivery/request/DeleteEntityByIDReq.h index c32d6f5f63..050e5adb71 100644 --- a/proxy/src/server/delivery/request/DeleteEntityByIDReq.h +++ b/proxy/src/server/delivery/request/DeleteEntityByIDReq.h @@ -29,18 +29,17 @@ namespace server { class DeleteEntityByIDReq : public BaseReq { public: static BaseReqPtr - Create(const ContextPtr& context, const std::string& collection_name, const engine::IDNumbers& entity_ids); + Create(const ContextPtr& context, const ::milvus::grpc::DeleteByIDParam *request); protected: - DeleteEntityByIDReq(const ContextPtr& context, const std::string& collection_name, - const engine::IDNumbers& entity_ids); + DeleteEntityByIDReq(const ContextPtr& context, const ::milvus::grpc::DeleteByIDParam *request); Status OnExecute() override; private: - const std::string collection_name_; - const engine::IDNumbers& entity_ids_; + const ::milvus::grpc::DeleteByIDParam *request_; + Status OnPostExecute(); }; } // namespace server diff --git a/proxy/src/server/delivery/request/InsertReq.cpp b/proxy/src/server/delivery/request/InsertReq.cpp index 3dac4da7de..0fd5f07a85 100644 --- a/proxy/src/server/delivery/request/InsertReq.cpp +++ b/proxy/src/server/delivery/request/InsertReq.cpp @@ -14,6 +14,7 @@ #include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" +#include "server/delivery/ReqScheduler.h" #include #include @@ -28,27 +29,27 @@ namespace milvus { namespace server { -InsertReq::InsertReq(const ContextPtr& context, const std::string& collection_name, const std::string& partition_name, - const int64_t& row_count, std::unordered_map>& chunk_data) +InsertReq::InsertReq(const ContextPtr &context, const ::milvus::grpc::InsertParam *insert_param) : BaseReq(context, ReqType::kInsert), - collection_name_(collection_name), - partition_name_(partition_name), - row_count_(row_count), - chunk_data_(chunk_data) { + insert_param_(insert_param) { } BaseReqPtr -InsertReq::Create(const ContextPtr& context, const std::string& collection_name, const std::string& partition_name, - const int64_t& row_count, std::unordered_map>& chunk_data) { - return std::shared_ptr(new InsertReq(context, collection_name, partition_name, row_count, chunk_data)); +InsertReq::Create(const ContextPtr &context, const ::milvus::grpc::InsertParam *insert_param) { + return std::shared_ptr(new InsertReq(context, insert_param)); } Status InsertReq::OnExecute() { - LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq."; + LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq."; + auto &msg_client = message_client::MsgClientV2::GetInstance(); + Status status = msg_client.SendMutMessage(*insert_param_); + return status; +} - - return Status::OK(); +Status InsertReq::OnPostExecute() { + ReqScheduler::GetInstance().UpdateLatestDeliveredReqTime(timestamp_); + return Status::OK(); } } // namespace server diff --git a/proxy/src/server/delivery/request/InsertReq.h b/proxy/src/server/delivery/request/InsertReq.h index a6eac001d3..1cc8103c26 100644 --- a/proxy/src/server/delivery/request/InsertReq.h +++ b/proxy/src/server/delivery/request/InsertReq.h @@ -23,22 +23,20 @@ namespace server { class InsertReq : public BaseReq { public: - static BaseReqPtr - Create(const ContextPtr& context, const std::string& collection_name, const std::string& partition_name, - const int64_t& row_count, std::unordered_map>& chunk_data); + static BaseReqPtr + Create(const ContextPtr &context, const ::milvus::grpc::InsertParam *chunk_data); protected: - InsertReq(const ContextPtr& context, const std::string& collection_name, const std::string& partition_name, - const int64_t& row_count, std::unordered_map>& chunk_data); + InsertReq(const ContextPtr &context, const ::milvus::grpc::InsertParam *chunk_data); - Status - OnExecute() override; + Status + OnExecute() override; + + Status + OnPostExecute() override ; private: - const std::string collection_name_; - const std::string partition_name_; - const int64_t row_count_; - std::unordered_map>& chunk_data_; + const ::milvus::grpc::InsertParam *insert_param_; }; } // namespace server diff --git a/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp b/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp index 142e2174d1..de9781fb10 100644 --- a/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -29,138 +29,138 @@ namespace milvus { namespace server { namespace grpc { -const char* EXTRA_PARAM_KEY = "params"; +const char *EXTRA_PARAM_KEY = "params"; const size_t MAXIMUM_FIELD_NUM = 64; ::milvus::grpc::ErrorCode ErrorMap(ErrorCode code) { - static const std::map code_map = { - {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, - {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, - {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, - {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, - {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE}, - {SERVER_COLLECTION_NOT_EXIST, ::milvus::grpc::ErrorCode::COLLECTION_NOT_EXISTS}, - {SERVER_INVALID_COLLECTION_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_COLLECTION_NAME}, - {SERVER_INVALID_COLLECTION_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, - {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, - {SERVER_INVALID_FIELD_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_INVALID_FIELD_NUM, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + static const std::map code_map = { + {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, + {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, + {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, + {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, + {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE}, + {SERVER_COLLECTION_NOT_EXIST, ::milvus::grpc::ErrorCode::COLLECTION_NOT_EXISTS}, + {SERVER_INVALID_COLLECTION_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_COLLECTION_NAME}, + {SERVER_INVALID_COLLECTION_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, + {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, + {SERVER_INVALID_FIELD_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_INVALID_FIELD_NUM, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE}, - {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, - {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, - {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK}, - {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST}, - {SERVER_INVALID_INDEX_METRIC_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE}, - {SERVER_INVALID_SEGMENT_ROW_COUNT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, - {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, - {SERVER_CACHE_FULL, ::milvus::grpc::ErrorCode::CACHE_FAILED}, - {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, - {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, - {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, - }; + {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE}, + {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK}, + {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST}, + {SERVER_INVALID_INDEX_METRIC_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE}, + {SERVER_INVALID_SEGMENT_ROW_COUNT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, + {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, + {SERVER_CACHE_FULL, ::milvus::grpc::ErrorCode::CACHE_FAILED}, + {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, + {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, + {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, + }; - if (code_map.find(code) != code_map.end()) { - return code_map.at(code); - } else { - return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR; - } + if (code_map.find(code) != code_map.end()) { + return code_map.at(code); + } else { + return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR; + } } std::string RequestMap(ReqType req_type) { - static const std::unordered_map req_map = { - {ReqType::kInsert, "Insert"}, {ReqType::kCreateIndex, "CreateIndex"}, {ReqType::kSearch, "Search"}, - {ReqType::kFlush, "Flush"}, {ReqType::kGetEntityByID, "GetEntityByID"}, {ReqType::kCompact, "Compact"}, - }; + static const std::unordered_map req_map = { + {ReqType::kInsert, "Insert"}, {ReqType::kCreateIndex, "CreateIndex"}, {ReqType::kSearch, "Search"}, + {ReqType::kFlush, "Flush"}, {ReqType::kGetEntityByID, "GetEntityByID"}, {ReqType::kCompact, "Compact"}, + }; - if (req_map.find(req_type) != req_map.end()) { - return req_map.at(req_type); - } else { - return "OtherReq"; - } + if (req_map.find(req_type) != req_map.end()) { + return req_map.at(req_type); + } else { + return "OtherReq"; + } } namespace { void -CopyVectorData(const google::protobuf::RepeatedPtrField<::milvus::grpc::VectorRowRecord>& grpc_records, - std::vector& vectors_data) { - // calculate buffer size - int64_t float_data_size = 0, binary_data_size = 0; - for (auto& record : grpc_records) { - float_data_size += record.float_data_size(); - binary_data_size += record.binary_data().size(); - } +CopyVectorData(const google::protobuf::RepeatedPtrField<::milvus::grpc::VectorRowRecord> &grpc_records, + std::vector &vectors_data) { + // calculate buffer size + int64_t float_data_size = 0, binary_data_size = 0; + for (auto &record : grpc_records) { + float_data_size += record.float_data_size(); + binary_data_size += record.binary_data().size(); + } - int64_t data_size = binary_data_size; - if (float_data_size > 0) { - data_size = float_data_size * sizeof(float); - } + int64_t data_size = binary_data_size; + if (float_data_size > 0) { + data_size = float_data_size * sizeof(float); + } - // copy vector data - vectors_data.resize(data_size); - int64_t offset = 0; - if (float_data_size > 0) { - for (auto& record : grpc_records) { - int64_t single_size = record.float_data_size() * sizeof(float); - memcpy(&vectors_data[offset], record.float_data().data(), single_size); - offset += single_size; - } - } else if (binary_data_size > 0) { - for (auto& record : grpc_records) { - int64_t single_size = record.binary_data().size(); - memcpy(&vectors_data[offset], record.binary_data().data(), single_size); - offset += single_size; - } + // copy vector data + vectors_data.resize(data_size); + int64_t offset = 0; + if (float_data_size > 0) { + for (auto &record : grpc_records) { + int64_t single_size = record.float_data_size() * sizeof(float); + memcpy(&vectors_data[offset], record.float_data().data(), single_size); + offset += single_size; } + } else if (binary_data_size > 0) { + for (auto &record : grpc_records) { + int64_t single_size = record.binary_data().size(); + memcpy(&vectors_data[offset], record.binary_data().data(), single_size); + offset += single_size; + } + } } void -DeSerialization(const ::milvus::grpc::GeneralQuery& general_query, query::BooleanQueryPtr& boolean_clause, - query::QueryPtr& query_ptr) { +DeSerialization(const ::milvus::grpc::GeneralQuery &general_query, query::BooleanQueryPtr &boolean_clause, + query::QueryPtr &query_ptr) { } void -ConstructResults(const TopKQueryResult& result, ::milvus::grpc::QueryResult* response) { - if (!response) { - return; - } +ConstructResults(const TopKQueryResult &result, ::milvus::grpc::QueryResult *response) { + if (!response) { + return; + } - response->set_row_num(result.row_num_); + response->set_row_num(result.row_num_); - response->mutable_entities()->mutable_ids()->Resize(static_cast(result.id_list_.size()), 0); - memcpy(response->mutable_entities()->mutable_ids()->mutable_data(), result.id_list_.data(), - result.id_list_.size() * sizeof(int64_t)); + response->mutable_entities()->mutable_ids()->Resize(static_cast(result.id_list_.size()), 0); + memcpy(response->mutable_entities()->mutable_ids()->mutable_data(), result.id_list_.data(), + result.id_list_.size() * sizeof(int64_t)); - response->mutable_distances()->Resize(static_cast(result.distance_list_.size()), 0.0); - memcpy(response->mutable_distances()->mutable_data(), result.distance_list_.data(), - result.distance_list_.size() * sizeof(float)); + response->mutable_distances()->Resize(static_cast(result.distance_list_.size()), 0.0); + memcpy(response->mutable_distances()->mutable_data(), result.distance_list_.data(), + result.distance_list_.size() * sizeof(float)); } class GrpcConnectionContext : public milvus::server::ConnectionContext { public: - explicit GrpcConnectionContext(::grpc::ServerContext* context) : context_(context) { + explicit GrpcConnectionContext(::grpc::ServerContext *context) : context_(context) { + } + + bool + IsConnectionBroken() const override { + if (context_ == nullptr) { + return true; } - bool - IsConnectionBroken() const override { - if (context_ == nullptr) { - return true; - } - - return context_->IsCancelled(); - } + return context_->IsCancelled(); + } private: - ::grpc::ServerContext* context_ = nullptr; + ::grpc::ServerContext *context_ = nullptr; }; } // namespace @@ -173,666 +173,679 @@ std::atomic _sequential_id; int64_t get_sequential_id() { - return _sequential_id++; + return _sequential_id++; } void -set_request_id(::grpc::ServerContext* context, const std::string& request_id) { - if (not context) { - // error - LOG_SERVER_ERROR_ << "set_request_id: grpc::ServerContext is nullptr" << std::endl; - return; - } +set_request_id(::grpc::ServerContext *context, const std::string &request_id) { + if (not context) { + // error + LOG_SERVER_ERROR_ << "set_request_id: grpc::ServerContext is nullptr" << std::endl; + return; + } - context->AddInitialMetadata(REQ_ID, request_id); + context->AddInitialMetadata(REQ_ID, request_id); } std::string -get_request_id(::grpc::ServerContext* context) { - if (not context) { - // error - LOG_SERVER_ERROR_ << "get_request_id: grpc::ServerContext is nullptr" << std::endl; - return "INVALID_ID"; - } +get_request_id(::grpc::ServerContext *context) { + if (not context) { + // error + LOG_SERVER_ERROR_ << "get_request_id: grpc::ServerContext is nullptr" << std::endl; + return "INVALID_ID"; + } - auto server_metadata = context->server_metadata(); + auto server_metadata = context->server_metadata(); - auto request_id_kv = server_metadata.find(REQ_ID); - if (request_id_kv == server_metadata.end()) { - // error - LOG_SERVER_ERROR_ << std::string(REQ_ID) << " not found in grpc.server_metadata" << std::endl; - return "INVALID_ID"; - } + auto request_id_kv = server_metadata.find(REQ_ID); + if (request_id_kv == server_metadata.end()) { + // error + LOG_SERVER_ERROR_ << std::string(REQ_ID) << " not found in grpc.server_metadata" << std::endl; + return "INVALID_ID"; + } - return request_id_kv->second.data(); + return request_id_kv->second.data(); } } // namespace -GrpcRequestHandler::GrpcRequestHandler(const std::shared_ptr& tracer) +GrpcRequestHandler::GrpcRequestHandler(const std::shared_ptr &tracer) : tracer_(tracer), random_num_generator_() { - std::random_device random_device; - random_num_generator_.seed(random_device()); - // TODO: do not hardcode pulsar message configure and init - std::string pulsar_server_address = "pulsar://localhost:6650"; - msg_client_ = std::make_shared(0, pulsar_server_address); - msg_client_->Init("topic-mut", "topic-query", "topic-result"); + std::random_device random_device; + random_num_generator_.seed(random_device()); } void GrpcRequestHandler::OnPostRecvInitialMetaData( - ::grpc::experimental::ServerRpcInfo* server_rpc_info, - ::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) { - std::unordered_map text_map; - auto* metadata_map = interceptor_batch_methods->GetRecvInitialMetadata(); - auto context_kv = metadata_map->find(tracing::TracerUtil::GetTraceContextHeaderName()); - if (context_kv != metadata_map->end()) { - text_map[std::string(context_kv->first.data(), context_kv->first.length())] = - std::string(context_kv->second.data(), context_kv->second.length()); - } - // test debug mode - // if (std::string(server_rpc_info->method()).find("Search") != std::string::npos) { - // text_map["demo-debug-id"] = "debug-id"; - // } + ::grpc::experimental::ServerRpcInfo *server_rpc_info, + ::grpc::experimental::InterceptorBatchMethods *interceptor_batch_methods) { + std::unordered_map text_map; + auto *metadata_map = interceptor_batch_methods->GetRecvInitialMetadata(); + auto context_kv = metadata_map->find(tracing::TracerUtil::GetTraceContextHeaderName()); + if (context_kv != metadata_map->end()) { + text_map[std::string(context_kv->first.data(), context_kv->first.length())] = + std::string(context_kv->second.data(), context_kv->second.length()); + } + // test debug mode + // if (std::string(server_rpc_info->method()).find("Search") != std::string::npos) { + // text_map["demo-debug-id"] = "debug-id"; + // } - tracing::TextMapCarrier carrier{text_map}; - auto span_context_maybe = tracer_->Extract(carrier); - if (!span_context_maybe) { - std::cerr << span_context_maybe.error().message() << std::endl; - return; - } - auto span = tracer_->StartSpan(server_rpc_info->method(), {opentracing::ChildOf(span_context_maybe->get())}); + tracing::TextMapCarrier carrier{text_map}; + auto span_context_maybe = tracer_->Extract(carrier); + if (!span_context_maybe) { + std::cerr << span_context_maybe.error().message() << std::endl; + return; + } + auto span = tracer_->StartSpan(server_rpc_info->method(), {opentracing::ChildOf(span_context_maybe->get())}); - auto server_context = server_rpc_info->server_context(); - auto client_metadata = server_context->client_metadata(); + auto server_context = server_rpc_info->server_context(); + auto client_metadata = server_context->client_metadata(); - // if client provide request_id in metadata, milvus just use it, - // else milvus generate a sequential id. - std::string request_id; - auto request_id_kv = client_metadata.find("request_id"); - if (request_id_kv != client_metadata.end()) { - request_id = request_id_kv->second.data(); - LOG_SERVER_DEBUG_ << "client provide request_id: " << request_id; + // if client provide request_id in metadata, milvus just use it, + // else milvus generate a sequential id. + std::string request_id; + auto request_id_kv = client_metadata.find("request_id"); + if (request_id_kv != client_metadata.end()) { + request_id = request_id_kv->second.data(); + LOG_SERVER_DEBUG_ << "client provide request_id: " << request_id; - // if request_id is being used by another request, - // convert it to request_id_n. - std::lock_guard lock(context_map_mutex_); - if (context_map_.find(request_id) == context_map_.end()) { - // if not found exist, mark - context_map_[request_id] = nullptr; - } else { - // Finding a unused suffix - int64_t suffix = 1; - std::string try_request_id; - bool exist = true; - do { - try_request_id = request_id + "_" + std::to_string(suffix); - exist = context_map_.find(try_request_id) != context_map_.end(); - suffix++; - } while (exist); - context_map_[try_request_id] = nullptr; - } + // if request_id is being used by another request, + // convert it to request_id_n. + std::lock_guard lock(context_map_mutex_); + if (context_map_.find(request_id) == context_map_.end()) { + // if not found exist, mark + context_map_[request_id] = nullptr; } else { - request_id = std::to_string(get_sequential_id()); - set_request_id(server_context, request_id); - LOG_SERVER_DEBUG_ << "milvus generate request_id: " << request_id; + // Finding a unused suffix + int64_t suffix = 1; + std::string try_request_id; + bool exist = true; + do { + try_request_id = request_id + "_" + std::to_string(suffix); + exist = context_map_.find(try_request_id) != context_map_.end(); + suffix++; + } while (exist); + context_map_[try_request_id] = nullptr; } + } else { + request_id = std::to_string(get_sequential_id()); + set_request_id(server_context, request_id); + LOG_SERVER_DEBUG_ << "milvus generate request_id: " << request_id; + } - auto trace_context = std::make_shared(span); - auto context = std::make_shared(request_id); - context->SetTraceContext(trace_context); - SetContext(server_rpc_info->server_context(), context); + auto trace_context = std::make_shared(span); + auto context = std::make_shared(request_id); + context->SetTraceContext(trace_context); + SetContext(server_rpc_info->server_context(), context); } void -GrpcRequestHandler::OnPreSendMessage(::grpc::experimental::ServerRpcInfo* server_rpc_info, - ::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) { - std::lock_guard lock(context_map_mutex_); - auto request_id = get_request_id(server_rpc_info->server_context()); +GrpcRequestHandler::OnPreSendMessage(::grpc::experimental::ServerRpcInfo *server_rpc_info, + ::grpc::experimental::InterceptorBatchMethods *interceptor_batch_methods) { + std::lock_guard lock(context_map_mutex_); + auto request_id = get_request_id(server_rpc_info->server_context()); - if (context_map_.find(request_id) == context_map_.end()) { - // error - LOG_SERVER_ERROR_ << "request_id " << request_id << " not found in context_map_"; - return; - } - context_map_[request_id]->GetTraceContext()->GetSpan()->Finish(); - context_map_.erase(request_id); + if (context_map_.find(request_id) == context_map_.end()) { + // error + LOG_SERVER_ERROR_ << "request_id " << request_id << " not found in context_map_"; + return; + } + context_map_[request_id]->GetTraceContext()->GetSpan()->Finish(); + context_map_.erase(request_id); } std::shared_ptr -GrpcRequestHandler::GetContext(::grpc::ServerContext* server_context) { - std::lock_guard lock(context_map_mutex_); - auto request_id = get_request_id(server_context); +GrpcRequestHandler::GetContext(::grpc::ServerContext *server_context) { + std::lock_guard lock(context_map_mutex_); + auto request_id = get_request_id(server_context); - auto iter = context_map_.find(request_id); - if (iter == context_map_.end()) { - LOG_SERVER_ERROR_ << "GetContext: request_id " << request_id << " not found in context_map_"; - return nullptr; - } + auto iter = context_map_.find(request_id); + if (iter == context_map_.end()) { + LOG_SERVER_ERROR_ << "GetContext: request_id " << request_id << " not found in context_map_"; + return nullptr; + } - if (iter->second != nullptr) { - ConnectionContextPtr connection_context = std::make_shared(server_context); - iter->second->SetConnectionContext(connection_context); - } - return iter->second; + if (iter->second != nullptr) { + ConnectionContextPtr connection_context = std::make_shared(server_context); + iter->second->SetConnectionContext(connection_context); + } + return iter->second; } void -GrpcRequestHandler::SetContext(::grpc::ServerContext* server_context, const std::shared_ptr& context) { - std::lock_guard lock(context_map_mutex_); - auto request_id = get_request_id(server_context); - context_map_[request_id] = context; +GrpcRequestHandler::SetContext(::grpc::ServerContext *server_context, const std::shared_ptr &context) { + std::lock_guard lock(context_map_mutex_); + auto request_id = get_request_id(server_context); + context_map_[request_id] = context; } uint64_t GrpcRequestHandler::random_id() const { - std::lock_guard lock(random_mutex_); - auto value = random_num_generator_(); - while (value == 0) { - value = random_num_generator_(); - } - return value; + std::lock_guard lock(random_mutex_); + auto value = random_num_generator_(); + while (value == 0) { + value = random_num_generator_(); + } + return value; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ::grpc::Status -GrpcRequestHandler::CreateCollection(::grpc::ServerContext* context, const ::milvus::grpc::Mapping* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::CreateCollection(::grpc::ServerContext *context, const ::milvus::grpc::Mapping *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - return ::grpc::Status::OK; + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::HasCollection(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, - ::milvus::grpc::BoolReply* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::HasCollection(::grpc::ServerContext *context, const ::milvus::grpc::CollectionName *request, + ::milvus::grpc::BoolReply *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - bool has_collection = false; + bool has_collection = false; - Status status = req_handler_.HasCollection(GetContext(context), request->collection_name(), has_collection); - response->set_bool_reply(has_collection); + Status status = req_handler_.HasCollection(GetContext(context), request->collection_name(), has_collection); + response->set_bool_reply(has_collection); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::DropCollection(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::DropCollection(::grpc::ServerContext *context, const ::milvus::grpc::CollectionName *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - Status status = req_handler_.DropCollection(GetContext(context), request->collection_name()); + Status status = req_handler_.DropCollection(GetContext(context), request->collection_name()); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); - return ::grpc::Status::OK; + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::CreateIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request) - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, const ::milvus::grpc::IndexParam *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request) + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - milvus::json json_params; - for (int i = 0; i < request->extra_params_size(); i++) { - const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i); - if (extra.key() == EXTRA_PARAM_KEY) { - json_params[EXTRA_PARAM_KEY] = json::parse(extra.value()); - } else { - json_params[extra.key()] = extra.value(); - } - } - - Status status = req_handler_.CreateIndex(GetContext(context), request->collection_name(), request->field_name(), - request->index_name(), json_params); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request, - ::milvus::grpc::IndexParam* response) { - CHECK_NULLPTR_RETURN(request) - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - std::string index_name; - milvus::json index_params; - Status status = req_handler_.DescribeIndex(GetContext(context), request->collection_name(), request->field_name(), - index_name, index_params); - - response->set_collection_name(request->collection_name()); - response->set_field_name(request->field_name()); - ::milvus::grpc::KeyValuePair* kv = response->add_extra_params(); - kv->set_key(EXTRA_PARAM_KEY); - kv->set_value(index_params.dump()); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - Status status = req_handler_.DropIndex(GetContext(context), request->collection_name(), request->field_name(), - request->index_name()); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); - - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::GetEntityByID(::grpc::ServerContext* context, const ::milvus::grpc::EntityIdentity* request, - ::milvus::grpc::Entities* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::GetEntityIDs(::grpc::ServerContext* context, const ::milvus::grpc::GetEntityIDsParam* request, - ::milvus::grpc::EntityIds* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::SearchInSegment(::grpc::ServerContext* context, const ::milvus::grpc::SearchInSegmentParam* request, - ::milvus::grpc::QueryResult* response) { - - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::DescribeCollection(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, - ::milvus::grpc::Mapping* response) { - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - CHECK_NULLPTR_RETURN(request); - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::CountCollection(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, - ::milvus::grpc::CollectionRowCount* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - int64_t row_count = 0; - Status status = req_handler_.CountEntities(GetContext(context), request->collection_name(), row_count); - response->set_collection_row_count(row_count); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::ShowCollections(::grpc::ServerContext* context, const ::milvus::grpc::Command* request, - ::milvus::grpc::CollectionNameList* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - std::vector collections; - Status status = req_handler_.ListCollections(GetContext(context), collections); - for (auto& collection : collections) { - response->add_collection_names(collection); - } - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::ShowCollectionInfo(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, - ::milvus::grpc::CollectionInfo* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - std::string collection_stats; - Status status = req_handler_.GetCollectionStats(GetContext(context), request->collection_name(), collection_stats); - response->set_json_info(collection_stats); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - - return ::grpc::Status::OK; -} - -::grpc::Status -GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Command* request, - ::milvus::grpc::StringReply* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - std::string reply; - Status status; - - std::string cmd = request->cmd(); - std::vector requests; - if (cmd == "requests") { - std::lock_guard lock(context_map_mutex_); - for (auto& iter : context_map_) { - if (nullptr == iter.second) { - continue; - } - if (iter.second->ReqID() == get_request_id(context)) { - continue; - } - auto request_str = RequestMap(iter.second->GetReqType()) + "-" + iter.second->ReqID(); - requests.emplace_back(request_str); - } - nlohmann::json reply_json; - reply_json["requests"] = requests; - reply = reply_json.dump(); - response->set_string_reply(reply); + milvus::json json_params; + for (int i = 0; i < request->extra_params_size(); i++) { + const ::milvus::grpc::KeyValuePair &extra = request->extra_params(i); + if (extra.key() == EXTRA_PARAM_KEY) { + json_params[EXTRA_PARAM_KEY] = json::parse(extra.value()); } else { - status = req_handler_.Cmd(GetContext(context), cmd, reply); - response->set_string_reply(reply); + json_params[extra.key()] = extra.value(); } + } - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); + Status status = req_handler_.CreateIndex(GetContext(context), request->collection_name(), request->field_name(), + request->index_name(), json_params); - return ::grpc::Status::OK; + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::DeleteByID(::grpc::ServerContext* context, const ::milvus::grpc::DeleteByIDParam* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context, const ::milvus::grpc::IndexParam *request, + ::milvus::grpc::IndexParam *response) { + CHECK_NULLPTR_RETURN(request) + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - return ::grpc::Status::OK; + std::string index_name; + milvus::json index_params; + Status status = req_handler_.DescribeIndex(GetContext(context), request->collection_name(), request->field_name(), + index_name, index_params); + + response->set_collection_name(request->collection_name()); + response->set_field_name(request->field_name()); + ::milvus::grpc::KeyValuePair *kv = response->add_extra_params(); + kv->set_key(EXTRA_PARAM_KEY); + kv->set_value(index_params.dump()); + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::PreloadCollection(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::DropIndex(::grpc::ServerContext *context, const ::milvus::grpc::IndexParam *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - Status status = req_handler_.LoadCollection(GetContext(context), request->collection_name()); + Status status = req_handler_.DropIndex(GetContext(context), request->collection_name(), request->field_name(), + request->index_name()); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); - return ::grpc::Status::OK; + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::CreatePartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - - Status status = req_handler_.CreatePartition(GetContext(context), request->collection_name(), request->tag()); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); - - return ::grpc::Status::OK; +GrpcRequestHandler::GetEntityByID(::grpc::ServerContext *context, const ::milvus::grpc::EntityIdentity *request, + ::milvus::grpc::Entities *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::HasPartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request, - ::milvus::grpc::BoolReply* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::GetEntityIDs(::grpc::ServerContext *context, const ::milvus::grpc::GetEntityIDsParam *request, + ::milvus::grpc::EntityIds *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - bool has_collection = false; - - Status status = - req_handler_.HasPartition(GetContext(context), request->collection_name(), request->tag(), has_collection); - response->set_bool_reply(has_collection); - - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); - - return ::grpc::Status::OK; + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, - ::milvus::grpc::PartitionList* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::SearchInSegment(::grpc::ServerContext *context, const ::milvus::grpc::SearchInSegmentParam *request, + ::milvus::grpc::QueryResult *response) { - std::vector partition_names; - Status status = req_handler_.ListPartitions(GetContext(context), request->collection_name(), partition_names); - for (auto& pn : partition_names) { - response->add_partition_tag_array(pn); + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::DescribeCollection(::grpc::ServerContext *context, const ::milvus::grpc::CollectionName *request, + ::milvus::grpc::Mapping *response) { + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + CHECK_NULLPTR_RETURN(request); + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::CountCollection(::grpc::ServerContext *context, const ::milvus::grpc::CollectionName *request, + ::milvus::grpc::CollectionRowCount *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + int64_t row_count = 0; + Status status = req_handler_.CountEntities(GetContext(context), request->collection_name(), row_count); + response->set_collection_row_count(row_count); + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); + + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::ShowCollections(::grpc::ServerContext *context, const ::milvus::grpc::Command *request, + ::milvus::grpc::CollectionNameList *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + std::vector collections; + Status status = req_handler_.ListCollections(GetContext(context), collections); + for (auto &collection : collections) { + response->add_collection_names(collection); + } + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); + + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::ShowCollectionInfo(::grpc::ServerContext *context, const ::milvus::grpc::CollectionName *request, + ::milvus::grpc::CollectionInfo *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + std::string collection_stats; + Status status = req_handler_.GetCollectionStats(GetContext(context), request->collection_name(), collection_stats); + response->set_json_info(collection_stats); + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); + + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::Cmd(::grpc::ServerContext *context, const ::milvus::grpc::Command *request, + ::milvus::grpc::StringReply *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + std::string reply; + Status status; + + std::string cmd = request->cmd(); + std::vector requests; + if (cmd == "requests") { + std::lock_guard lock(context_map_mutex_); + for (auto &iter : context_map_) { + if (nullptr == iter.second) { + continue; + } + if (iter.second->ReqID() == get_request_id(context)) { + continue; + } + auto request_str = RequestMap(iter.second->GetReqType()) + "-" + iter.second->ReqID(); + requests.emplace_back(request_str); } + nlohmann::json reply_json; + reply_json["requests"] = requests; + reply = reply_json.dump(); + response->set_string_reply(reply); + } else { + status = req_handler_.Cmd(GetContext(context), cmd, reply); + response->set_string_reply(reply); + } - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response->mutable_status(), status, context); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); - return ::grpc::Status::OK; + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::DropPartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::DeleteByID(::grpc::ServerContext *context, const ::milvus::grpc::DeleteByIDParam *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - Status status = req_handler_.DropPartition(GetContext(context), request->collection_name(), request->tag()); + Status status = req_handler_.DeleteEntityByID(GetContext(context), request); + SET_RESPONSE(response, status, context) - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); - - return ::grpc::Status::OK; + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::Flush(::grpc::ServerContext* context, const ::milvus::grpc::FlushParam* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::PreloadCollection(::grpc::ServerContext *context, const ::milvus::grpc::CollectionName *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - std::vector collection_names; - for (int32_t i = 0; i < request->collection_name_array().size(); i++) { - collection_names.push_back(request->collection_name_array(i)); - } - Status status = req_handler_.Flush(GetContext(context), collection_names); + Status status = req_handler_.LoadCollection(GetContext(context), request->collection_name()); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); - return ::grpc::Status::OK; + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::Compact(::grpc::ServerContext* context, const ::milvus::grpc::CompactParam* request, - ::milvus::grpc::Status* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::CreatePartition(::grpc::ServerContext *context, const ::milvus::grpc::PartitionParam *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - Status status = req_handler_.Compact(GetContext(context), request->collection_name(), request->threshold()); + Status status = req_handler_.CreatePartition(GetContext(context), request->collection_name(), request->tag()); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); - SET_RESPONSE(response, status, context); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); - return ::grpc::Status::OK; + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::HasPartition(::grpc::ServerContext *context, const ::milvus::grpc::PartitionParam *request, + ::milvus::grpc::BoolReply *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + bool has_collection = false; + + Status status = + req_handler_.HasPartition(GetContext(context), request->collection_name(), request->tag(), has_collection); + response->set_bool_reply(has_collection); + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); + + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::ShowPartitions(::grpc::ServerContext *context, const ::milvus::grpc::CollectionName *request, + ::milvus::grpc::PartitionList *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + std::vector partition_names; + Status status = req_handler_.ListPartitions(GetContext(context), request->collection_name(), partition_names); + for (auto &pn : partition_names) { + response->add_partition_tag_array(pn); + } + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response->mutable_status(), status, context); + + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::DropPartition(::grpc::ServerContext *context, const ::milvus::grpc::PartitionParam *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + Status status = req_handler_.DropPartition(GetContext(context), request->collection_name(), request->tag()); + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); + + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::Flush(::grpc::ServerContext *context, const ::milvus::grpc::FlushParam *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + std::vector collection_names; + for (int32_t i = 0; i < request->collection_name_array().size(); i++) { + collection_names.push_back(request->collection_name_array(i)); + } + Status status = req_handler_.Flush(GetContext(context), collection_names); + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); + + return ::grpc::Status::OK; +} + +::grpc::Status +GrpcRequestHandler::Compact(::grpc::ServerContext *context, const ::milvus::grpc::CompactParam *request, + ::milvus::grpc::Status *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + + Status status = req_handler_.Compact(GetContext(context), request->collection_name(), request->threshold()); + + LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); + SET_RESPONSE(response, status, context); + + return ::grpc::Status::OK; } /*******************************************New Interface*********************************************/ ::grpc::Status -GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc::InsertParam* request, - ::milvus::grpc::EntityIds* response) { +GrpcRequestHandler::Insert(::grpc::ServerContext *context, const ::milvus::grpc::InsertParam *request, + ::milvus::grpc::EntityIds *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); + // check insert param is valid + if (request->entity_id_array_size() != 0 && request->entity_id_array_size() != request->rows_data_size()) { + auto status = Status{SERVER_INVALID_ROWRECORD_ARRAY, "ID size not matches entity size"}; + SET_RESPONSE(response->mutable_status(), status, context); + } - // TODO: add pulsar message proucer - return ::grpc::Status::OK; + // generate uid for entities + //if (request->entity_id_array_size() == 0) { + // auto ids = std::vector(request->rows_data_size()); + //} + + // delivery to pulsar message topic + Status status = req_handler_.Insert(GetContext(context), request); + if (status.ok()) { + response->mutable_entity_id_array()->CopyFrom(request->entity_id_array()); + } + return ::grpc::Status::OK; } ::grpc::Status -GrpcRequestHandler::SearchPB(::grpc::ServerContext* context, const ::milvus::grpc::SearchParamPB* request, - ::milvus::grpc::QueryResult* response) { - CHECK_NULLPTR_RETURN(request); +GrpcRequestHandler::SearchPB(::grpc::ServerContext *context, const ::milvus::grpc::SearchParamPB *request, + ::milvus::grpc::QueryResult *response) { + CHECK_NULLPTR_RETURN(request); - return ::grpc::Status::OK; + return ::grpc::Status::OK; } Status -GrpcRequestHandler::ProcessLeafQueryJson(const nlohmann::json& json, query::BooleanQueryPtr& query, - std::string& field_name) { - auto status = Status::OK(); - if (json.contains("term")) { - auto leaf_query = std::make_shared(); - auto term_query = std::make_shared(); - nlohmann::json json_obj = json["term"]; - JSON_NULL_CHECK(json_obj); - JSON_OBJECT_CHECK(json_obj); - term_query->json_obj = json_obj; - nlohmann::json::iterator json_it = json_obj.begin(); - field_name = json_it.key(); +GrpcRequestHandler::ProcessLeafQueryJson(const nlohmann::json &json, query::BooleanQueryPtr &query, + std::string &field_name) { + auto status = Status::OK(); + if (json.contains("term")) { + auto leaf_query = std::make_shared(); + auto term_query = std::make_shared(); + nlohmann::json json_obj = json["term"]; + JSON_NULL_CHECK(json_obj); + JSON_OBJECT_CHECK(json_obj); + term_query->json_obj = json_obj; + nlohmann::json::iterator json_it = json_obj.begin(); + field_name = json_it.key(); - leaf_query->term_query = term_query; - query->AddLeafQuery(leaf_query); - } else if (json.contains("range")) { - auto leaf_query = std::make_shared(); - auto range_query = std::make_shared(); - nlohmann::json json_obj = json["range"]; - JSON_NULL_CHECK(json_obj); - JSON_OBJECT_CHECK(json_obj); - range_query->json_obj = json_obj; - nlohmann::json::iterator json_it = json_obj.begin(); - field_name = json_it.key(); + leaf_query->term_query = term_query; + query->AddLeafQuery(leaf_query); + } else if (json.contains("range")) { + auto leaf_query = std::make_shared(); + auto range_query = std::make_shared(); + nlohmann::json json_obj = json["range"]; + JSON_NULL_CHECK(json_obj); + JSON_OBJECT_CHECK(json_obj); + range_query->json_obj = json_obj; + nlohmann::json::iterator json_it = json_obj.begin(); + field_name = json_it.key(); - leaf_query->range_query = range_query; - query->AddLeafQuery(leaf_query); - } else if (json.contains("vector")) { - auto leaf_query = std::make_shared(); - auto vector_json = json["vector"]; - JSON_NULL_CHECK(vector_json); + leaf_query->range_query = range_query; + query->AddLeafQuery(leaf_query); + } else if (json.contains("vector")) { + auto leaf_query = std::make_shared(); + auto vector_json = json["vector"]; + JSON_NULL_CHECK(vector_json); - leaf_query->vector_placeholder = vector_json.get(); - query->AddLeafQuery(leaf_query); - } else { - return Status{SERVER_INVALID_ARGUMENT, "Leaf query get wrong key"}; - } - return status; + leaf_query->vector_placeholder = vector_json.get(); + query->AddLeafQuery(leaf_query); + } else { + return Status{SERVER_INVALID_ARGUMENT, "Leaf query get wrong key"}; + } + return status; } Status -GrpcRequestHandler::ProcessBooleanQueryJson(const nlohmann::json& query_json, query::BooleanQueryPtr& boolean_query, - query::QueryPtr& query_ptr) { - auto status = Status::OK(); - if (query_json.empty()) { - return Status{SERVER_INVALID_ARGUMENT, "BoolQuery is null"}; - } - for (auto& el : query_json.items()) { - if (el.key() == "must") { - boolean_query->SetOccur(query::Occur::MUST); - auto must_json = el.value(); - if (!must_json.is_array()) { - std::string msg = "Must json string is not an array"; - return Status{SERVER_INVALID_DSL_PARAMETER, msg}; - } +GrpcRequestHandler::ProcessBooleanQueryJson(const nlohmann::json &query_json, query::BooleanQueryPtr &boolean_query, + query::QueryPtr &query_ptr) { + auto status = Status::OK(); + if (query_json.empty()) { + return Status{SERVER_INVALID_ARGUMENT, "BoolQuery is null"}; + } + for (auto &el : query_json.items()) { + if (el.key() == "must") { + boolean_query->SetOccur(query::Occur::MUST); + auto must_json = el.value(); + if (!must_json.is_array()) { + std::string msg = "Must json string is not an array"; + return Status{SERVER_INVALID_DSL_PARAMETER, msg}; + } - for (auto& json : must_json) { - auto must_query = std::make_shared(); - if (json.contains("must") || json.contains("should") || json.contains("must_not")) { - STATUS_CHECK(ProcessBooleanQueryJson(json, must_query, query_ptr)); - boolean_query->AddBooleanQuery(must_query); - } else { - std::string field_name; - STATUS_CHECK(ProcessLeafQueryJson(json, boolean_query, field_name)); - if (!field_name.empty()) { - query_ptr->index_fields.insert(field_name); - } - } - } - } else if (el.key() == "should") { - boolean_query->SetOccur(query::Occur::SHOULD); - auto should_json = el.value(); - if (!should_json.is_array()) { - std::string msg = "Should json string is not an array"; - return Status{SERVER_INVALID_DSL_PARAMETER, msg}; - } - - for (auto& json : should_json) { - auto should_query = std::make_shared(); - if (json.contains("must") || json.contains("should") || json.contains("must_not")) { - STATUS_CHECK(ProcessBooleanQueryJson(json, should_query, query_ptr)); - boolean_query->AddBooleanQuery(should_query); - } else { - std::string field_name; - STATUS_CHECK(ProcessLeafQueryJson(json, boolean_query, field_name)); - if (!field_name.empty()) { - query_ptr->index_fields.insert(field_name); - } - } - } - } else if (el.key() == "must_not") { - boolean_query->SetOccur(query::Occur::MUST_NOT); - auto should_json = el.value(); - if (!should_json.is_array()) { - std::string msg = "Must_not json string is not an array"; - return Status{SERVER_INVALID_DSL_PARAMETER, msg}; - } - - for (auto& json : should_json) { - if (json.contains("must") || json.contains("should") || json.contains("must_not")) { - auto must_not_query = std::make_shared(); - STATUS_CHECK(ProcessBooleanQueryJson(json, must_not_query, query_ptr)); - boolean_query->AddBooleanQuery(must_not_query); - } else { - std::string field_name; - STATUS_CHECK(ProcessLeafQueryJson(json, boolean_query, field_name)); - if (!field_name.empty()) { - query_ptr->index_fields.insert(field_name); - } - } - } + for (auto &json : must_json) { + auto must_query = std::make_shared(); + if (json.contains("must") || json.contains("should") || json.contains("must_not")) { + STATUS_CHECK(ProcessBooleanQueryJson(json, must_query, query_ptr)); + boolean_query->AddBooleanQuery(must_query); } else { - std::string msg = "BoolQuery json string does not include bool query"; - return Status{SERVER_INVALID_DSL_PARAMETER, msg}; + std::string field_name; + STATUS_CHECK(ProcessLeafQueryJson(json, boolean_query, field_name)); + if (!field_name.empty()) { + query_ptr->index_fields.insert(field_name); + } } - } + } + } else if (el.key() == "should") { + boolean_query->SetOccur(query::Occur::SHOULD); + auto should_json = el.value(); + if (!should_json.is_array()) { + std::string msg = "Should json string is not an array"; + return Status{SERVER_INVALID_DSL_PARAMETER, msg}; + } - return status; + for (auto &json : should_json) { + auto should_query = std::make_shared(); + if (json.contains("must") || json.contains("should") || json.contains("must_not")) { + STATUS_CHECK(ProcessBooleanQueryJson(json, should_query, query_ptr)); + boolean_query->AddBooleanQuery(should_query); + } else { + std::string field_name; + STATUS_CHECK(ProcessLeafQueryJson(json, boolean_query, field_name)); + if (!field_name.empty()) { + query_ptr->index_fields.insert(field_name); + } + } + } + } else if (el.key() == "must_not") { + boolean_query->SetOccur(query::Occur::MUST_NOT); + auto should_json = el.value(); + if (!should_json.is_array()) { + std::string msg = "Must_not json string is not an array"; + return Status{SERVER_INVALID_DSL_PARAMETER, msg}; + } + + for (auto &json : should_json) { + if (json.contains("must") || json.contains("should") || json.contains("must_not")) { + auto must_not_query = std::make_shared(); + STATUS_CHECK(ProcessBooleanQueryJson(json, must_not_query, query_ptr)); + boolean_query->AddBooleanQuery(must_not_query); + } else { + std::string field_name; + STATUS_CHECK(ProcessLeafQueryJson(json, boolean_query, field_name)); + if (!field_name.empty()) { + query_ptr->index_fields.insert(field_name); + } + } + } + } else { + std::string msg = "BoolQuery json string does not include bool query"; + return Status{SERVER_INVALID_DSL_PARAMETER, msg}; + } + } + + return status; } Status GrpcRequestHandler::DeserializeJsonToBoolQuery( - const google::protobuf::RepeatedPtrField<::milvus::grpc::VectorParam>& vector_params, const std::string& dsl_string, - query::BooleanQueryPtr& boolean_query, query::QueryPtr& query_ptr) { - return Status::OK(); + const google::protobuf::RepeatedPtrField<::milvus::grpc::VectorParam> &vector_params, const std::string &dsl_string, + query::BooleanQueryPtr &boolean_query, query::QueryPtr &query_ptr) { + return Status::OK(); } ::grpc::Status -GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request, - ::milvus::grpc::QueryResult* response) { - CHECK_NULLPTR_RETURN(request); - LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); +GrpcRequestHandler::Search(::grpc::ServerContext *context, const ::milvus::grpc::SearchParam *request, + ::milvus::grpc::QueryResult *response) { + CHECK_NULLPTR_RETURN(request); + LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - return ::grpc::Status::OK; + return ::grpc::Status::OK; } } // namespace grpc diff --git a/proxy/src/server/grpc_impl/GrpcRequestHandler.h b/proxy/src/server/grpc_impl/GrpcRequestHandler.h index 54222a4485..16ea8bedf0 100644 --- a/proxy/src/server/grpc_impl/GrpcRequestHandler.h +++ b/proxy/src/server/grpc_impl/GrpcRequestHandler.h @@ -337,9 +337,6 @@ class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service, // May remove req_handler ? ReqHandler req_handler_; - // delivery and receive pulsar message - std::shared_ptr msg_client_; - std::unordered_map> context_map_; std::shared_ptr tracer_; diff --git a/proxy/src/server/grpc_impl/GrpcServer.cpp b/proxy/src/server/grpc_impl/GrpcServer.cpp index 49119e77da..284b80a838 100644 --- a/proxy/src/server/grpc_impl/GrpcServer.cpp +++ b/proxy/src/server/grpc_impl/GrpcServer.cpp @@ -38,6 +38,7 @@ // #include "server/DBWrapper.h" #include "server/grpc_impl/interceptor/SpanInterceptor.h" #include "utils/Log.h" +#include "pulsar/message_client/ClientV2.h" namespace milvus { namespace server { @@ -99,14 +100,15 @@ GrpcServer::StartService() { HelloService helloService; builder.RegisterService(&helloService); - // report address to master - auto reportClient = new ReportClient(::grpc::CreateChannel("192.168.2.28:50051", - ::grpc::InsecureChannelCredentials())); - auto status = reportClient->ReportAddress(); - delete(reportClient); - if (!status.ok()){ - return Status(milvus::DB_ERROR, ""); - } + + // report address to master, test only for now +// auto reportClient = new ReportClient(::grpc::CreateChannel("192.168.2.28:50051", +// ::grpc::InsecureChannelCredentials())); +// auto status = reportClient->ReportAddress(); +// delete(reportClient); +// if (!status.ok()){ +// return Status(milvus::DB_ERROR, ""); +// } // Add gRPC interceptor using InterceptorI = ::grpc::experimental::ServerInterceptorFactoryInterface; diff --git a/proxy/src/server/timesync/TimeSync.cpp b/proxy/src/server/timesync/TimeSync.cpp deleted file mode 100644 index 0e4dd0a9a6..0000000000 --- a/proxy/src/server/timesync/TimeSync.cpp +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include -#include "TimeSync.h" -#include "pulsar/message_client/Producer.h" - -namespace milvus { -namespace timesync { - -TimeSync::TimeSync(int64_t id, - std::function timestamp, - const int interval, - const std::string &pulsar_addr, - const std::string &time_sync_topic) : - timestamp_(timestamp), interval_(interval), pulsar_addr_(pulsar_addr), time_sync_topic_(time_sync_topic) { - sync_msg_.set_peer_id(id); - auto timer = [&]() { - std::shared_ptr - client = std::make_shared(this->pulsar_addr_); - milvus::message_client::MsgProducer producer(client, this->time_sync_topic_); - - for (;;) { - if (this->stop_) break; - this->sync_msg_.set_timestamp(this->timestamp_()); - this->sync_msg_.set_sync_type(milvus::grpc::READ); - auto rst = producer.send(sync_msg_.SerializeAsString()); - if (rst != pulsar::ResultOk) { - //TODO, add log - } - std::this_thread::sleep_for(std::chrono::milliseconds(this->interval_)); - } - auto rst = producer.close(); - if (rst != pulsar::ResultOk) { - //TODO, add log or throw exception - } - rst = client->close(); - if (rst != pulsar::ResultOk) { - //TODO, add log or throw exception - } - }; - timer_ = std::thread(timer); -} - -TimeSync::~TimeSync() { - stop_ = true; - timer_.join(); -} - -void TimeSync::Stop() { - stop_ = true; -} - -bool TimeSync::IsStop() const { - return stop_; -} - -} // namespace timesync -} // namespace milvus \ No newline at end of file diff --git a/proxy/src/server/timesync/TimeSync.h b/proxy/src/server/timesync/TimeSync.h deleted file mode 100644 index 44b58837b4..0000000000 --- a/proxy/src/server/timesync/TimeSync.h +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include "suvlim.pb.h" - -namespace milvus { -namespace timesync { - -class TimeSync { - public: - TimeSync(int64_t id, - std::function timestamp, - const int interval, - const std::string &pulsar_addr, - const std::string &time_sync_topic); - virtual ~TimeSync(); - - void Stop(); - bool IsStop() const; - private: - std::function timestamp_; - const int interval_; - const std::string pulsar_addr_; - const std::string time_sync_topic_; - bool stop_ = false; - std::thread timer_; - milvus::grpc::TimeSyncMsg sync_msg_; -}; - -} // namespace timesync -} // namespace milvus \ No newline at end of file diff --git a/proxy/src/server/tso/TSO.cpp b/proxy/src/server/tso/TSO.cpp new file mode 100644 index 0000000000..7e418e54d9 --- /dev/null +++ b/proxy/src/server/tso/TSO.cpp @@ -0,0 +1,36 @@ +#include "TSO.h" + +namespace milvus { +namespace server { + +TSOracle& TSOracle::GetInstance() { + static TSOracle oracle; + return oracle; +} + +uint64_t TSOracle::GetTimeStamp() { + std::lock_guard lock(mutex_); + auto now = std::chrono::high_resolution_clock::now(); + uint64_t physical = GetPhysical(now); + uint64_t ts = ComposeTs(physical, 0); + + if (last_time_stamp_ == ts) { + logical_++; + return ts + logical_; + } + last_time_stamp_ = ts; + logical_ = 0; + return ts; +} + +uint64_t TSOracle::GetPhysical(const std::chrono::high_resolution_clock::time_point &t) { + auto nano_time = std::chrono::duration_cast(t.time_since_epoch()); + return nano_time / std::chrono::microseconds(1); +} + +uint64_t TSOracle::ComposeTs(uint64_t physical, uint64_t logical) { + return uint64_t((physical << physical_shift_bits) + logical); +} + +} +} \ No newline at end of file diff --git a/proxy/src/server/tso/TSO.h b/proxy/src/server/tso/TSO.h new file mode 100644 index 0000000000..9031748ee8 --- /dev/null +++ b/proxy/src/server/tso/TSO.h @@ -0,0 +1,29 @@ +#pragma once +#include +#include + +namespace milvus { +namespace server { + +const uint32_t physical_shift_bits = 18; + +class TSOracle { + public: + static TSOracle& GetInstance(); + + uint64_t GetTimeStamp(); + + private: + uint64_t GetPhysical(const std::chrono::high_resolution_clock::time_point &t); + uint64_t ComposeTs(uint64_t physical, uint64_t logical); + + private: + TSOracle() = default; + + private: + std::mutex mutex_; + uint64_t last_time_stamp_; + uint64_t logical_; +}; +} +} \ No newline at end of file