Merge remote-tracking branch 'upstream/branch-1.2' into branch-1.2

Former-commit-id: 45a2bc5528231149db971aabd82d054a71f40df5
pull/191/head
zhiru 2019-05-29 19:18:10 +08:00
commit 12aa730645
15 changed files with 147 additions and 115 deletions

View File

@ -12,12 +12,12 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-5 - Implement Auto Archive Feature
- MS-16 - Implement metrics without prometheus
- MS-6 - Implement SDK interface part 1
- MS-21 - Implement SDK interface part 2
- MS-26 - cmake. Add thirdparty packages
### Task
- MS-1 - Add CHANGELOG.md
- MS-4 - Refactor the vecwise_engine code structure
- MS-6 - Implement SDK interface part 1
- MS-20 - Clean Code Part 1
- MS-6 - Implement SDK interface part 2

View File

@ -71,27 +71,24 @@ include_directories(thrift/gen-cpp)
#target_link_libraries(megasearch zstd)
set(third_party_libs
boost_system_static
boost_filesystem_static
boost_serialization_static
bzip2
easyloggingpp
sqlite
# sqlite_orm
thrift
yaml-cpp
faiss
gtest
gtest_main
gmock
lapack
lz4
openblas
prometheus-cpp-push
prometheus-cpp-pull
prometheus-cpp-core
rocksdb
boost_system_static
boost_filesystem_static
boost_serialization_static
bzip2
lz4
snappy
sqlite
# sqlite_orm
thrift
yaml-cpp
zlib
zstd
)

View File

@ -8,14 +8,12 @@ aux_source_directory(src src_files)
include_directories(src)
include_directories(../../megasearch_sdk/include)
include_directories(/usr/include)
link_directories(${CMAKE_BINARY_DIR}/megasearch_sdk)
add_executable(sdk_simple
./main.cpp
${src_files}
${service_files}
)
target_link_libraries(sdk_simple

View File

@ -19,7 +19,8 @@ main(int argc, char *argv[]) {
printf("Client start...\n");
std::string app_name = basename(argv[0]);
static struct option long_options[] = {{"conf_file", optional_argument, 0, 'c'},
static struct option long_options[] = {{"server", optional_argument, 0, 's'},
{"port", optional_argument, 0, 'p'},
{"help", no_argument, 0, 'h'},
{NULL, 0, 0, 0}};
@ -28,9 +29,9 @@ main(int argc, char *argv[]) {
app_name = argv[0];
int value;
while ((value = getopt_long(argc, argv, "c:p:dh", long_options, &option_index)) != -1) {
while ((value = getopt_long(argc, argv, "s:p:h", long_options, &option_index)) != -1) {
switch (value) {
case 'h': {
case 's': {
char *address_ptr = strdup(optarg);
address = address_ptr;
free(address_ptr);
@ -38,12 +39,14 @@ main(int argc, char *argv[]) {
}
case 'p': {
char *port_ptr = strdup(optarg);
address = port_ptr;
port = port_ptr;
free(port_ptr);
break;
}
case 'h':
default:
break;
print_help(app_name);
return EXIT_SUCCESS;
}
}
@ -58,7 +61,8 @@ void
print_help(const std::string &app_name) {
printf("\n Usage: %s [OPTIONS]\n\n", app_name.c_str());
printf(" Options:\n");
printf(" -h Megasearch server address\n");
printf(" -p Megasearch server port\n");
printf(" -s --server Server address, default 127.0.0.1\n");
printf(" -p --port Server port, default 33001\n");
printf(" -h --help Print help information\n");
printf("\n");
}

View File

@ -13,6 +13,16 @@
using namespace megasearch;
namespace {
std::string GetTableName();
static const std::string TABLE_NAME = GetTableName();
static const std::string VECTOR_COLUMN_NAME = "face_vector";
static const std::string ID_COLUMN_NAME = "aid";
static const std::string CITY_COLUMN_NAME = "city";
static constexpr int64_t TABLE_DIMENSION = 512;
static constexpr int64_t TOTAL_ROW_COUNT = 100000;
static constexpr int64_t TOP_K = 10;
static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
@ -76,12 +86,6 @@ namespace {
return s_id;
}
static const std::string TABLE_NAME = GetTableName();
static const std::string VECTOR_COLUMN_NAME = "face_vector";
static const std::string AGE_COLUMN_NAME = "age";
static const std::string CITY_COLUMN_NAME = "city";
static const int64_t TABLE_DIMENSION = 512;
TableSchema BuildTableSchema() {
TableSchema tb_schema;
VectorColumn col1;
@ -90,7 +94,7 @@ namespace {
col1.store_raw_vector = true;
tb_schema.vector_column_array.emplace_back(col1);
Column col2 = {ColumnType::int8, AGE_COLUMN_NAME};
Column col2 = {ColumnType::int8, ID_COLUMN_NAME};
tb_schema.attribute_column_array.emplace_back(col2);
Column col3 = {ColumnType::int16, CITY_COLUMN_NAME};
@ -139,7 +143,7 @@ namespace {
if(vector_record_array) {
RowRecord record;
record.vector_map.insert(std::make_pair(VECTOR_COLUMN_NAME, f_p));
record.attribute_map[AGE_COLUMN_NAME] = std::to_string(k%100);
record.attribute_map[ID_COLUMN_NAME] = std::to_string(k);
record.attribute_map[CITY_COLUMN_NAME] = CITY_MAP.at(k%CITY_MAP.size());
vector_record_array->emplace_back(record);
}
@ -147,7 +151,7 @@ namespace {
if(query_record_array) {
QueryRecord record;
record.vector_map.insert(std::make_pair(VECTOR_COLUMN_NAME, f_p));
record.selected_column_array.push_back(AGE_COLUMN_NAME);
record.selected_column_array.push_back(ID_COLUMN_NAME);
record.selected_column_array.push_back(CITY_COLUMN_NAME);
query_record_array->emplace_back(record);
}
@ -158,19 +162,27 @@ namespace {
void
ClientTest::Test(const std::string& address, const std::string& port) {
std::shared_ptr<Connection> conn = Connection::Create();
ConnectParam param = { address, port };
conn->Connect(param);
{//get server version
{//connect server
ConnectParam param = {address, port};
Status stat = conn->Connect(param);
std::cout << "Connect function call status: " << stat.ToString() << std::endl;
}
{//server version
std::string version = conn->ServerVersion();
std::cout << "MegaSearch server version: " << version << std::endl;
}
{//sdk version
std::string version = conn->ClientVersion();
std::cout << "SDK version: " << version << std::endl;
}
{
std::cout << "ShowTables" << std::endl;
std::vector<std::string> tables;
Status stat = conn->ShowTables(tables);
std::cout << "Function call status: " << stat.ToString() << std::endl;
std::cout << "ShowTables function call status: " << stat.ToString() << std::endl;
std::cout << "All tables: " << std::endl;
for(auto& table : tables) {
std::cout << "\t" << table << std::endl;
@ -180,26 +192,23 @@ ClientTest::Test(const std::string& address, const std::string& port) {
{//create table
TableSchema tb_schema = BuildTableSchema();
PrintTableSchema(tb_schema);
std::cout << "CreateTable" << std::endl;
Status stat = conn->CreateTable(tb_schema);
std::cout << "Function call status: " << stat.ToString() << std::endl;
std::cout << "CreateTable function call status: " << stat.ToString() << std::endl;
}
{//describe table
TableSchema tb_schema;
std::cout << "DescribeTable" << std::endl;
Status stat = conn->DescribeTable(TABLE_NAME, tb_schema);
std::cout << "Function call status: " << stat.ToString() << std::endl;
std::cout << "DescribeTable function call status: " << stat.ToString() << std::endl;
PrintTableSchema(tb_schema);
}
{//add vectors
std::vector<RowRecord> record_array;
BuildVectors(0, 10000, &record_array, nullptr);
BuildVectors(0, TOTAL_ROW_COUNT, &record_array, nullptr);
std::vector<int64_t> record_ids;
std::cout << "AddVector" << std::endl;
Status stat = conn->AddVector(TABLE_NAME, record_array, record_ids);
std::cout << "Function call status: " << stat.ToString() << std::endl;
std::cout << "AddVector function call status: " << stat.ToString() << std::endl;
PrintRecordIdArray(record_ids);
}
@ -207,18 +216,17 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "Waiting data persist. Sleep 10 seconds ..." << std::endl;
sleep(10);
std::vector<QueryRecord> record_array;
BuildVectors(500, 510, nullptr, &record_array);
BuildVectors(SEARCH_TARGET, SEARCH_TARGET + 10, nullptr, &record_array);
std::vector<TopKQueryResult> topk_query_result_array;
std::cout << "SearchVector" << std::endl;
Status stat = conn->SearchVector(TABLE_NAME, record_array, topk_query_result_array, 10);
std::cout << "Function call status: " << stat.ToString() << std::endl;
Status stat = conn->SearchVector(TABLE_NAME, record_array, topk_query_result_array, TOP_K);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
PrintSearchResult(topk_query_result_array);
}
// {//delete table
// Status stat = conn->DeleteTable(TABLE_NAME);
// std::cout << "Delete table result: " << stat.ToString() << std::endl;
// std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
// }
{//server status

View File

@ -14,7 +14,8 @@ enum class StatusCode {
OK = 0,
Invalid = 1,
UnknownError = 2,
NotSupported = 3
NotSupported = 3,
NotConnected = 4
};
/**

View File

@ -16,32 +16,50 @@ ClientProxy::ClientPtr() const {
return client_ptr;
}
bool ClientProxy::IsConnected() const {
return (client_ptr != nullptr && connected_);
}
Status
ClientProxy::Connect(const ConnectParam &param) {
Disconnect();
int32_t port = atoi(param.port.c_str());
return ClientPtr()->Connect(param.ip_address, port, THRIFT_PROTOCOL_BINARY);
Status status = ClientPtr()->Connect(param.ip_address, port, THRIFT_PROTOCOL_BINARY);
if(status.ok()) {
connected_ = true;
}
return status;
}
Status
ClientProxy::Connect(const std::string &uri) {
Disconnect();
return Status::NotSupported("Connect interface is not supported.");
size_t index = uri.find_first_of(":", 0);
if((index == std::string::npos)) {
return Status::Invalid("Invalid uri");
}
ConnectParam param;
param.ip_address = uri.substr(0, index);
param.port = uri.substr(index + 1);
return Connect(param);
}
Status
ClientProxy::Connected() const {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
std::string info;
ClientPtr()->interface()->Ping(info, "");
} catch ( std::exception& ex) {
return Status(StatusCode::UnknownError, "connection lost: " + std::string(ex.what()));
return Status(StatusCode::NotConnected, "connection lost: " + std::string(ex.what()));
}
return Status::OK();
@ -49,10 +67,11 @@ ClientProxy::Connected() const {
Status
ClientProxy::Disconnect() {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
connected_ = false;
return ClientPtr()->Disconnect();
}
@ -63,8 +82,8 @@ ClientProxy::ClientVersion() const {
Status
ClientProxy::CreateTable(const TableSchema &param) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -103,8 +122,8 @@ ClientProxy::CreateTable(const TableSchema &param) {
Status
ClientProxy::CreateTablePartition(const CreateTablePartitionParam &param) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -132,8 +151,8 @@ ClientProxy::CreateTablePartition(const CreateTablePartitionParam &param) {
Status
ClientProxy::DeleteTablePartition(const DeleteTablePartitionParam &param) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -152,8 +171,8 @@ ClientProxy::DeleteTablePartition(const DeleteTablePartitionParam &param) {
Status
ClientProxy::DeleteTable(const std::string &table_name) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -170,8 +189,8 @@ Status
ClientProxy::AddVector(const std::string &table_name,
const std::vector<RowRecord> &record_array,
std::vector<int64_t> &id_array) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -205,8 +224,8 @@ ClientProxy::SearchVector(const std::string &table_name,
const std::vector<QueryRecord> &query_record_array,
std::vector<TopKQueryResult> &topk_query_result_array,
int64_t topk) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -253,8 +272,8 @@ ClientProxy::SearchVector(const std::string &table_name,
Status
ClientProxy::DescribeTable(const std::string &table_name, TableSchema &table_schema) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -290,8 +309,8 @@ ClientProxy::DescribeTable(const std::string &table_name, TableSchema &table_sch
Status
ClientProxy::ShowTables(std::vector<std::string> &table_array) {
if(client_ptr == nullptr) {
return Status(StatusCode::UnknownError, "not connected");
if(!IsConnected()) {
return Status(StatusCode::NotConnected, "not connected to server");
}
try {
@ -306,7 +325,7 @@ ClientProxy::ShowTables(std::vector<std::string> &table_array) {
std::string
ClientProxy::ServerVersion() const {
if(client_ptr == nullptr) {
if(!IsConnected()) {
return "";
}
@ -321,8 +340,8 @@ ClientProxy::ServerVersion() const {
std::string
ClientProxy::ServerStatus() const {
if(client_ptr == nullptr) {
return "not connected";
if(!IsConnected()) {
return "not connected to server";
}
try {

View File

@ -51,9 +51,11 @@ public:
private:
std::shared_ptr<ThriftClient>& ClientPtr() const;
bool IsConnected() const;
private:
mutable std::shared_ptr<ThriftClient> client_ptr;
bool connected_ = false;
};
}

View File

@ -65,7 +65,7 @@ ThriftClient::Connect(const std::string& address, int32_t port, const std::strin
client_ = std::make_shared<thrift::MegasearchServiceClient>(protocol_ptr);
} catch ( std::exception& ex) {
//CLIENT_LOG_ERROR << "connect encounter exception: " << ex.what();
return Status(StatusCode::UnknownError, "failed to connect megasearch server" + std::string(ex.what()));
return Status(StatusCode::NotConnected, "failed to connect megasearch server" + std::string(ex.what()));
}
return Status::OK();

View File

@ -101,6 +101,8 @@ std::string Status::CodeAsString() const {
break;
case StatusCode::NotSupported: type = "Not Supported";
break;
case StatusCode::NotConnected: type = "Not Connected";
break;
default: type = "Unknown";
break;
}

View File

@ -9,6 +9,7 @@
#include "utils/Log.h"
#include "utils/CommonUtil.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"

View File

@ -8,13 +8,16 @@
#include "utils/Error.h"
#include "VecIdMapper.h"
#include "rocksdb/db.h"
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
namespace rocksdb {
class DB;
class ColumnFamilyHandle;
}
namespace zilliz {
namespace vecwise {
namespace server {

View File

@ -15,6 +15,7 @@
#include "rocksdb/options.h"
#include <exception>
#include <unordered_map>
namespace zilliz {
namespace vecwise {
@ -30,6 +31,30 @@ IVecIdMapper* IVecIdMapper::GetInstance() {
#endif
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SimpleIdMapper : public IVecIdMapper{
public:
SimpleIdMapper();
~SimpleIdMapper();
ServerError AddGroup(const std::string& group) override;
bool IsGroupExist(const std::string& group) const override;
ServerError AllGroups(std::vector<std::string>& groups) const override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
using ID_MAPPING = std::unordered_map<std::string, std::string>;
mutable std::unordered_map<std::string, ID_MAPPING> id_groups_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
SimpleIdMapper::SimpleIdMapper() {

View File

@ -9,11 +9,6 @@
#include <string>
#include <vector>
#include <unordered_map>
namespace rocksdb {
class DB;
}
namespace zilliz {
namespace vecwise {
@ -40,29 +35,6 @@ public:
virtual ServerError DeleteGroup(const std::string& group) = 0;
};
class SimpleIdMapper : public IVecIdMapper{
public:
SimpleIdMapper();
~SimpleIdMapper();
ServerError AddGroup(const std::string& group) override;
bool IsGroupExist(const std::string& group) const override;
ServerError AllGroups(std::vector<std::string>& groups) const override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
using ID_MAPPING = std::unordered_map<std::string, std::string>;
mutable std::unordered_map<std::string, ID_MAPPING> id_groups_;
};
}
}
}

View File

@ -35,19 +35,19 @@ cuda_add_executable(server_test
set(require_libs
stdc++
boost_system
boost_filesystem
pthread
snappy
bz2
z
zstd
rocksdb
faiss
cudart
cublas
sqlite3
boost_system
boost_filesystem
snappy
z
bz2
zstd
lz4
pthread
)
target_link_libraries(server_test