Add build.sh for core

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2020-09-02 20:05:02 +08:00 committed by yefu.chen
parent d461529172
commit 18a56c9b5d
24 changed files with 11306 additions and 6 deletions

View File

@ -1,8 +1,8 @@
if [[ -d "./build" ]]
then
#!/bin/bash
if [[ -d "./build" ]]; then
rm -rf build
fi
mkdir build && cd build
cmake ..
make -j8 && make install
make -j8 && make install

View File

@ -25,6 +25,7 @@ add_subdirectory( query )
add_subdirectory( db ) # target milvus_engine
add_subdirectory( log )
add_subdirectory( server )
add_subdirectory( pulsar )
set(link_lib
milvus_engine

View File

@ -21,11 +21,10 @@
#include "src/version.h"
#include "utils/SignalHandler.h"
#include "utils/Status.h"
#include "pulsar/Client.h"
#include "pulsar/message_client/Client.h"
INITIALIZE_EASYLOGGINGPP
auto c = pulsar::Client("12");
void
print_help(const std::string& app_name) {

View File

@ -0,0 +1,2 @@
add_subdirectory(message_client)
#add_subdirectory(unittest)

View File

@ -0,0 +1,15 @@
set(src-cpp
Client.cpp
Consumer.cpp
Producer.cpp
pb/pulsar.pb.cc)
add_library(message_client_cpp SHARED
${src-cpp}
)
target_include_directories(message_client_cpp PUBLIC ${PROJECT_BINARY_DIR}/thirdparty/pulsar/pulsar-src/pulsar-client-cpp/include)
target_link_libraries(message_client_cpp pulsarStatic libprotobuf)
#install(TARGETS message_client_cpp
# DESTINATION lib)

View File

@ -0,0 +1,11 @@
#include "Client.h"
namespace message_client {
MsgClient::MsgClient(const std::string &serviceUrl) : pulsar::Client(serviceUrl) {}
MsgClient::MsgClient(const std::string &serviceUrl, const pulsar::ClientConfiguration& clientConfiguration)
: pulsar::Client(serviceUrl, clientConfiguration) {}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include "pulsar/Client.h"
#include "pulsar/ClientConfiguration.h"
namespace message_client {
using Result = pulsar::Result;
using Message = pulsar::Message;
class MsgClient : public pulsar::Client{
public:
MsgClient(const std::string& serviceUrl);
MsgClient(const std::string& serviceUrl, const pulsar::ClientConfiguration& clientConfiguration);
void set_client_id(int64_t id) { client_id_ = id; }
int64_t get_client_id() { return client_id_; }
private:
int64_t client_id_;
};
}

View File

@ -0,0 +1,66 @@
#include "Consumer.h"
#include "pb/pulsar.pb.h"
namespace message_client {
MsgConsumer::MsgConsumer(std::shared_ptr<MsgClient> &client, std::string subscription_name, const ConsumerConfiguration conf)
:client_(client), config_(conf), subscription_name_(subscription_name){}
Result MsgConsumer::subscribe(const std::string &topic) {
return client_->subscribe(topic, subscription_name_, config_, consumer_);
}
Result MsgConsumer::subscribe(const std::vector<std::string> &topics) {
return client_->subscribe(topics, subscription_name_, config_, consumer_);
}
Result MsgConsumer::unsubscribe() {
return consumer_.unsubscribe();
}
Result MsgConsumer::receive(Message &msg) {
return consumer_.receive(msg);
}
std::shared_ptr<void> MsgConsumer::receive_proto(ConsumerType consumer_type) {
Message msg;
receive(msg);
acknowledge(msg);
switch (consumer_type) {
case INSERT: {
pb::InsertMsg insert_msg;
insert_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::InsertMsg>(insert_msg);
return std::shared_ptr<void>(message);
}
case DELETE: {
pb::DeleteMsg delete_msg;
delete_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::DeleteMsg>(delete_msg);
return std::shared_ptr<void>(message);
}
case SEARCH_RESULT: {
pb::SearchResultMsg search_res_msg;
search_res_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::SearchResultMsg>(search_res_msg);
return std::shared_ptr<void>(message);
}
case TEST:
pb::TestData test_msg;
test_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::TestData>(test_msg);
return std::shared_ptr<void>(message);
}
return nullptr;
}
Result MsgConsumer::close() {
return consumer_.close();
}
Result MsgConsumer::acknowledge(const Message &message) {
return consumer_.acknowledge(message);
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include "pulsar/Consumer.h"
#include "Client.h"
namespace message_client {
enum ConsumerType {
INSERT = 0,
DELETE = 1,
SEARCH_RESULT = 2,
TEST = 3,
};
using Consumer = pulsar::Consumer;
using ConsumerConfiguration = pulsar::ConsumerConfiguration;
class MsgConsumer{
public:
MsgConsumer(std::shared_ptr<message_client::MsgClient> &client, std::string consumer_name,
const pulsar::ConsumerConfiguration conf = ConsumerConfiguration());
Result subscribe(const std::string& topic);
Result subscribe(const std::vector<std::string>& topics);
Result unsubscribe();
Result receive(Message& msg);
std::shared_ptr<void> receive_proto(ConsumerType consumer_type);
Result acknowledge(const Message& message);
Result close();
const Consumer&
consumer() const {return consumer_; }
private:
Consumer consumer_;
std::shared_ptr<MsgClient> client_;
ConsumerConfiguration config_;
std::string subscription_name_;
};
}

View File

@ -0,0 +1,27 @@
#include "Producer.h"
namespace message_client {
MsgProducer::MsgProducer(std::shared_ptr<MsgClient> &client, const std::string &topic, const ProducerConfiguration conf) : client_(client), config_(conf){
createProducer(topic);
}
Result MsgProducer::createProducer(const std::string &topic) {
return client_->createProducer(topic, producer_);
}
Result MsgProducer::send(const Message &msg) {
return producer_.send(msg);
}
Result MsgProducer::send(const std::string &msg) {
auto pulsar_msg = pulsar::MessageBuilder().setContent(msg).build();
return send(pulsar_msg);
}
Result MsgProducer::close() {
return producer_.close();
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include "pulsar/Producer.h"
#include "Client.h"
namespace message_client {
using Producer = pulsar::Producer;
using ProducerConfiguration = pulsar::ProducerConfiguration;
class MsgProducer{
public:
MsgProducer(std::shared_ptr<MsgClient> &client, const std::string &topic, const ProducerConfiguration conf = ProducerConfiguration());
Result createProducer(const std::string& topic);
Result send(const Message& msg);
Result send(const std::string& msg);
Result close();
const Producer&
producer() const { return producer_; }
private:
Producer producer_;
std::shared_ptr<MsgClient> client_;
ProducerConfiguration config_;
};
}

View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
../../../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I=./ --cpp_out=./ pulsar.proto

View File

@ -0,0 +1,744 @@
syntax = "proto3";
package pb;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
/**
* @brief Attribute record
*/
message AttrRecord {
repeated int32 int32_value = 1;
repeated int64 int64_value = 2;
repeated float float_value = 3;
repeated double double_value = 4;
}
/**
* @brief Vector records
*/
message VectorRecord {
repeated VectorRowRecord records = 1;
}
/**
* @brief Field values
*/
message FieldValue {
string field_name = 1;
DataType type = 2;
AttrRecord attr_record = 3;
VectorRecord vector_record = 4;
}
/**
* @brief Parameters for insert action
*/
message InsertParam {
string collection_name = 1;
repeated FieldValue fields = 2;
repeated int64 entity_id_array = 3; //optional
string partition_tag = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Entity ids
*/
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
/**
* @brief Search vector parameters
*/
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
/**
* @brief Parameters for search action
* @dsl example:
* {
* "query": {
* "bool": {
* "must": [
* {
* "must":[
* {
* "should": [
* {
* "term": {
* "gender": ["male"]
* }
* },
* {
* "range": {
* "height": {"gte": "170.0", "lte": "180.0"}
* }
* }
* ]
* },
* {
* "must_not": [
* {
* "term": {
* "age": [20, 21, 22, 23, 24, 25]
* }
* },
* {
* "Range": {
* "weight": {"lte": "100"}
* }
* }
* ]
* }
* ]
* },
* {
* "must": [
* {
* "vector": {
* "face_img": {
* "topk": 10,
* "metric_type": "L2",
* "query": [],
* "params": {
* "nprobe": 10
* }
* }
* }
* }
* ]
* }
* ]
* }
* },
* "fields": ["age", "face_img"]
* }
*/
message SearchParam {
string collection_name = 1;
repeated string partition_tag_array = 2;
repeated VectorParam vector_param = 3;
string dsl = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for searching in segments
*/
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
/**
* @brief Entities
*/
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated FieldValue fields = 4;
}
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,157 @@
syntax = "proto3";
package pb;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
enum OpType {
Insert = 0;
Delete = 1;
Search = 2;
TimeSync = 3;
Key2Seg = 4;
Statistics = 5;
}
message SegmentRecord {
repeated string seg_info = 1;
}
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message AttrRecord {
repeated int32 int32_value = 1;
repeated int64 int64_value = 2;
repeated float float_value = 3;
repeated double double_value = 4;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
message FieldValue {
string field_name = 1;
DataType type = 2;
AttrRecord attr_record = 3;
VectorRecord vector_record = 4;
}
message Cell {
oneof value{
int32 int32_value = 1;
int64 int64_value = 2;
float float_value = 3;
double double_value = 4;
VectorRowRecord vec = 5;
}
}
message RowValue{
repeated Cell cell = 2;
}
message PulsarMessage {
string collection_name = 1;
repeated FieldValue fields = 2;
int64 entity_id = 3;
string partition_tag = 4;
VectorParam vector_param =5;
SegmentRecord segments = 6;
int64 timestamp = 7;
int64 client_id = 8;
OpType msg_type = 9;
string topic_name = 10;
int64 partition_id = 11;
}
//message PulsarMessages {
// string collection_name = 1;
// repeated FieldValue fields = 2;
// repeated int64 entity_id = 3;
// string partition_tag = 4;
// repeated VectorParam vector_param =5;
// repeated SegmentRecord segments = 6;
// repeated int64 timestamp = 7;
// repeated int64 client_id = 8;
// OpType msg_type = 9;
// repeated string topic_name = 10;
// repeated int64 partition_id = 11;
//}
message TestData {
string id = 1;
string name = 2;
}
message InsertMsg {
int64 client_id = 1;
}
message DeleteMsg {
int64 client_id = 1;
}
message SearchMsg {
int64 client_id = 1;
}
message SearchResultMsg {
int64 client_id = 1;
}

View File

@ -0,0 +1,14 @@
enable_testing()
set(unittest_srcs
unittest_entry.cpp
consumer_test.cpp producer_test.cpp)
add_executable(test ${unittest_srcs})
target_link_libraries(test
message_client_cpp
pulsar
gtest
gtest_main)
install(TARGETS test DESTINATION unittest)

View File

@ -0,0 +1,14 @@
#include <gtest/gtest.h>
#include "pulsar/message_client/Consumer.h"
#include "pulsar/message_client/pb/pulsar.pb.h"
TEST(CLIENT_CPP, CONSUMER) {
auto client= std::make_shared<message_client::MsgClient>("pulsar://localhost:6650");
message_client::MsgConsumer consumer(client, "my_consumer");
consumer.subscribe("test");
auto msg = consumer.receive_proto(message_client::TEST);
pb::TestData* data = (pb::TestData*)(msg.get());
std::cout << "Received: " << msg << " with payload '" << data->name()<< ";" << data->id();
consumer.close();
client->close();
}

View File

@ -0,0 +1,15 @@
#include <gtest/gtest.h>
#include "pulsar/message_client/Producer.h"
#include "pulsar/message_client/pb/pulsar.pb.h"
TEST(CLIENT_CPP, Producer) {
auto client= std::make_shared<message_client::MsgClient>("pulsar://localhost:6650");
message_client::MsgProducer producer(client,"test");
pb::TestData data;
data.set_id("test");
data.set_name("hahah");
std::string to_string = data.SerializeAsString();
producer.send(to_string);
producer.close();
client->close();
}

View File

@ -0,0 +1,6 @@
#include <gtest/gtest.h>
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,26 @@
AUX_SOURCE_DIRECTORY(. TEST)
set( GRPC_SERVICE_FILES ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/hello.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/hello.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/master.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/master.pb.cc
)
add_executable(test_pulsar ${TEST} ${GRPC_SERVICE_FILES})
target_include_directories(test_pulsar PUBLIC ${PROJECT_BINARY_DIR}/thirdparty/pulsar/pulsar-src/pulsar-client-cpp/include)
target_include_directories(test_pulsar PUBLIC ${PROJECT_BINARY_DIR}/thirdparty/avro/avro-build/include)
target_link_libraries(test_pulsar
pulsarStatic
libprotobuf
grpc++_reflection
grpc++
libboost_system.a
libboost_filesystem.a
libboost_serialization.a
)

View File

@ -0,0 +1,71 @@
#include "thread"
#include "pulsar/Client.h"
#include <src/grpc/gen-milvus/hello.grpc.pb.h>
#include <src/grpc/gen-status/status.pb.h>
using namespace pulsar;
using MyData = milvus::grpc::PMessage;
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"id\",\"type\":\"string\"}, {\"name\":\"reason\",\"type\":\"string\"}]}";
int consumer() {
Client client("pulsar://localhost:6650");
ConsumerConfiguration consumerConf;
Consumer consumer;
consumerConf.setSchema(SchemaInfo(PROTOBUF, "Protobuf", exampleSchema));
Result result = client.subscribe("topic-proto", "sub-2", consumerConf, consumer);
if (result != ResultOk) {
std::cout << "Failed to subscribe: " << result << std::endl;
return -1;
}
Message msg;
for (int i = 0; i < 10; i++) {
consumer.receive(msg);
MyData data;
data.ParseFromString(msg.getDataAsString());
std::cout << " Received: " << msg
<< " with payload '" << data.id() << " " << data.reason() << "'"
<< std::endl;
consumer.acknowledge(msg);
}
client.close();
return 0;
}
int main() {
Client client("pulsar://localhost:6650");
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(PROTOBUF, "Protobuf", exampleSchema));
Result result = client.createProducer("pro", producerConf, producer);
if (result != ResultOk) {
std::cout << "Error creating producer: " << result << std::endl;
return -1;
}
// std::thread t(consumer);
// Publish 10 messages to the topic
for (int i = 0; i < 1; i++) {
auto data = MyData();
auto a = new milvus::grpc::A();
a->set_a(9999);
data.set_allocated_a(a);
data.set_id("999");
data.set_reason("*****test****");
Message msg = MessageBuilder().setContent(data.SerializeAsString()).build();
Result res = producer.send(msg);
std::cout << " Message sent: " << res << std::endl;
}
client.close();
// t.join();
}

56
proxy/thirdparty/avro/CMakeLists.txt vendored Normal file
View File

@ -0,0 +1,56 @@
#-------------------------------------------------------------------------------
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
if (DEFINED ENV{MILVUS_AVRO_URL})
set(AVRO_URL "$ENV{MILVUS_AVRO_URL}")
else ()
set(AVRO_URL
"https://github.com/apache/avro/archive/${AVRO_VERSION}.zip")
endif ()
message(STATUS "Building avro-${AVRO_VERSION} from source")
FetchContent_Declare(
avro
URL ${AVRO_URL}
# URL_MD5 "f9137c5bc18b7d74027936f0f1bfa5c8"
DOWNLOAD_DIR ${MILVUS_BINARY_DIR}/3rdparty_download/download
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/avro-src
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/avro-build
)
include(FetchContent)
FetchContent_GetProperties( avro )
if (NOT avro_POPULATED)
FetchContent_Populate(avro)
# file(REMOVE ${avro_SOURCE_DIR}/pulsar-client-cpp/CMakeLists.txt)
# message("${pulsar_SOURCE_DIR}/pulsar-client-cpp/CMakeLists.txt")
# EXECUTE_PROCESS(COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/CMakeLists ${pulsar_SOURCE_DIR}/pulsar-client-cpp/CMakeLists.txt)
# Adding the following targets:
# pulsar-client-cpp
add_subdirectory(${avro_SOURCE_DIR}/lang/c++
${avro_BINARY_DIR}
EXCLUDE_FROM_ALL)
file (INSTALL DIRECTORY ${avro_SOURCE_DIR}/lang/c++/api/ DESTINATION ${avro_BINARY_DIR}/include/avro
FILES_MATCHING PATTERN *.hh)
# target_include_directories(avrocpp PUBLIC ${pulsar_SOURCE_DIR}/pulsar-client-cpp/include)
endif ()
get_property(var DIRECTORY "${avro_SOURCE_DIR}/lang/c++" PROPERTY COMPILE_OPTIONS)
message(STATUS "avro compile options: ${var}")

View File

@ -45,7 +45,7 @@ FetchContent_Declare(
include(FetchContent)
FetchContent_GetProperties( pulsar )
SET(BUILD_TESTS CACHE BOOL OFF FORCE)
if (NOT pulsar_POPULATED)
FetchContent_Populate(pulsar)
@ -53,6 +53,11 @@ if (NOT pulsar_POPULATED)
message("${pulsar_SOURCE_DIR}/pulsar-client-cpp/CMakeLists.txt")
EXECUTE_PROCESS(COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/CMakeLists ${pulsar_SOURCE_DIR}/pulsar-client-cpp/CMakeLists.txt)
SET(BUILD_TESTS CACHE BOOL OFF FORCE)
# pulsar-cpp-client CMakeLists.txt need variables below, set them for build without install libprotobuf_dev
SET(Protobuf_INCLUDE_DIRS CACHE PATH ${CMAKE_BINARY_DIR}/thirdparty/grpc/grpc-src/third_party/protobuf/src FORCE)
SET(Protobuf_LITE_LIBRARIES CACHE PATH ${CMAKE_BINARY_DIR}/thirdparty/grpc/grpc-build/third_party/protobuf FORCE)
# Adding the following targets:
# pulsar-client-cpp
add_subdirectory(${pulsar_SOURCE_DIR}/pulsar-client-cpp