Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-449 Add vectors twice success, once with ids, the other no ids

See merge request megasearch/milvus!459

Former-commit-id: cc835bcb95bb12abe9e32690f01f6e4187bc63ea
pull/191/head
jinhai 2019-08-31 15:23:05 +08:00
commit 16ece61364
16 changed files with 172 additions and 125 deletions

View File

@ -17,6 +17,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-430 - Search no result if index created with FLAT - MS-430 - Search no result if index created with FLAT
- MS-443 - Create index hang again - MS-443 - Create index hang again
- MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8 - MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8
- MS-450 - server hang after run stop_server.sh
- MS-449 - Add vectors twice success, once with ids, the other no ids
## Improvement ## Improvement
- MS-327 - Clean code for milvus - MS-327 - Clean code for milvus

View File

@ -22,6 +22,9 @@ class DB {
public: public:
static void Open(const Options& options, DB** dbptr); static void Open(const Options& options, DB** dbptr);
virtual Status Start() = 0;
virtual Status Stop() = 0;
virtual Status CreateTable(meta::TableSchema& table_schema_) = 0; virtual Status CreateTable(meta::TableSchema& table_schema_) = 0;
virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0; virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0;
virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0; virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0;

View File

@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
DBImpl::DBImpl(const Options& options) DBImpl::DBImpl(const Options& options)
: options_(options), : options_(options),
shutting_down_(false), shutting_down_(true),
compact_thread_pool_(1, 1), compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) { index_thread_pool_(1, 1) {
meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode); meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
if (options.mode != Options::MODE::READ_ONLY) { Start();
ENGINE_LOG_TRACE << "StartTimerTasks"; }
StartTimerTasks();
DBImpl::~DBImpl() {
Stop();
}
Status DBImpl::Start() {
if (!shutting_down_.load(std::memory_order_acquire)){
return Status::OK();
} }
//for distribute version, some nodes are read only
if (options_.mode != Options::MODE::READ_ONLY) {
ENGINE_LOG_TRACE << "StartTimerTasks";
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
shutting_down_.store(false, std::memory_order_release);
return Status::OK();
}
Status DBImpl::Stop() {
if (shutting_down_.load(std::memory_order_acquire)){
return Status::OK();
}
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
//wait compaction/buildindex finish
for(auto& result : compact_thread_results_) {
result.wait();
}
for(auto& result : index_thread_results_) {
result.wait();
}
//makesure all memory data serialized
MemSerialize();
return Status::OK();
} }
Status DBImpl::CreateTable(meta::TableSchema& table_schema) { Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
return Status::OK(); return Status::OK();
} }
void DBImpl::StartTimerTasks() {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
void DBImpl::BackgroundTimerTask() { void DBImpl::BackgroundTimerTask() {
Status status; Status status;
server::SystemInfo::GetInstance().Init(); server::SystemInfo::GetInstance().Init();
@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) {
return meta_ptr_->Size(result); return meta_ptr_->Size(result);
} }
DBImpl::~DBImpl() {
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
std::set<std::string> ids;
mem_mgr_->Serialize(ids);
}
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz

View File

@ -36,6 +36,9 @@ class DBImpl : public DB {
explicit DBImpl(const Options &options); explicit DBImpl(const Options &options);
Status Start() override;
Status Stop() override;
Status CreateTable(meta::TableSchema &table_schema) override; Status CreateTable(meta::TableSchema &table_schema) override;
Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override; Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override;
@ -91,18 +94,15 @@ class DBImpl : public DB {
~DBImpl() override; ~DBImpl() override;
private: private:
Status Status QueryAsync(const std::string &table_id,
QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files,
const meta::TableFilesSchema &files, uint64_t k,
uint64_t k, uint64_t nq,
uint64_t nq, uint64_t nprobe,
uint64_t nprobe, const float *vectors,
const float *vectors, const meta::DatesT &dates,
const meta::DatesT &dates, QueryResults &results);
QueryResults &results);
void StartTimerTasks();
void BackgroundTimerTask(); void BackgroundTimerTask();
void StartMetricTask(); void StartMetricTask();

View File

@ -152,10 +152,6 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
&& index1.metric_type_ == index2.metric_type_; && index1.metric_type_ == index2.metric_type_;
} }
bool UserDefinedId(int64_t flag) {
return flag & meta::FLAG_MASK_USERID;
}
meta::DateT GetDate(const std::time_t& t, int day_delta) { meta::DateT GetDate(const std::time_t& t, int day_delta) {
struct tm ltm; struct tm ltm;
localtime_r(&t, &ltm); localtime_r(&t, &ltm);

View File

@ -28,8 +28,6 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2); bool IsSameIndex(const TableIndex& index1, const TableIndex& index2);
bool UserDefinedId(int64_t flag);
meta::DateT GetDate(const std::time_t &t, int day_delta = 0); meta::DateT GetDate(const std::time_t &t, int day_delta = 0);
meta::DateT GetDate(); meta::DateT GetDate();
meta::DateT GetDateWithDelta(int day_delta); meta::DateT GetDateWithDelta(int day_delta);

View File

@ -22,7 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384;
constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2; constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB; constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB;
constexpr int64_t FLAG_MASK_USERID = 1; constexpr int64_t FLAG_MASK_NO_USERID = 0x1;
constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1;
typedef int DateT; typedef int DateT;
const DateT EmptyDate = -1; const DateT EmptyDate = -1;

View File

@ -20,7 +20,7 @@ SchedulerPtr SchedInst::instance = nullptr;
std::mutex SchedInst::mutex_; std::mutex SchedInst::mutex_;
void void
SchedServInit() { StartSchedulerService() {
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
for (auto &resource : resources) { for (auto &resource : resources) {
@ -57,6 +57,11 @@ SchedServInit() {
SchedInst::GetInstance()->Start(); SchedInst::GetInstance()->Start();
} }
void
StopSchedulerService() {
ResMgrInst::GetInstance()->Stop();
SchedInst::GetInstance()->Stop();
}
} }
} }
} }

View File

@ -53,7 +53,10 @@ private:
}; };
void void
SchedServInit(); StartSchedulerService();
void
StopSchedulerService();
} }
} }

View File

@ -17,6 +17,10 @@ namespace milvus {
namespace server { namespace server {
DBWrapper::DBWrapper() { DBWrapper::DBWrapper() {
}
ServerError DBWrapper::StartService() {
//db config //db config
zilliz::milvus::engine::Options opt; zilliz::milvus::engine::Options opt;
ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
@ -91,7 +95,9 @@ DBWrapper::DBWrapper() {
//create db instance //create db instance
std::string msg = opt.meta.path; std::string msg = opt.meta.path;
try { try {
zilliz::milvus::engine::DB::Open(opt, &db_); engine::DB* db = nullptr;
zilliz::milvus::engine::DB::Open(opt, &db);
db_.reset(db);
} catch(std::exception& ex) { } catch(std::exception& ex) {
msg = ex.what(); msg = ex.what();
} }
@ -100,10 +106,18 @@ DBWrapper::DBWrapper() {
std::cout << "ERROR! Failed to open database: " << msg << std::endl; std::cout << "ERROR! Failed to open database: " << msg << std::endl;
kill(0, SIGUSR1); kill(0, SIGUSR1);
} }
db_->Start();
return SERVER_SUCCESS;
} }
DBWrapper::~DBWrapper() { ServerError DBWrapper::StopService() {
delete db_; if(db_) {
db_->Stop();
}
return SERVER_SUCCESS;
} }
} }

View File

@ -5,8 +5,11 @@
******************************************************************************/ ******************************************************************************/
#pragma once #pragma once
#include "utils/Error.h"
#include "db/DB.h" #include "db/DB.h"
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
@ -14,18 +17,27 @@ namespace server {
class DBWrapper { class DBWrapper {
private: private:
DBWrapper(); DBWrapper();
~DBWrapper(); ~DBWrapper() = default;
public: public:
static zilliz::milvus::engine::DB* DB() { static DBWrapper& GetInstance() {
static DBWrapper db_wrapper; static DBWrapper wrapper;
return db_wrapper.db(); return wrapper;
} }
zilliz::milvus::engine::DB* db() { return db_; } static std::shared_ptr<engine::DB> DB() {
return GetInstance().EngineDB();
}
ServerError StartService();
ServerError StopService();
std::shared_ptr<engine::DB> EngineDB() {
return db_;
}
private: private:
zilliz::milvus::engine::DB* db_ = nullptr; std::shared_ptr<engine::DB> db_;
}; };
} }

View File

@ -21,6 +21,7 @@
#include <src/scheduler/SchedInst.h> #include <src/scheduler/SchedInst.h>
#include "metrics/Metrics.h" #include "metrics/Metrics.h"
#include "DBWrapper.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
@ -158,7 +159,7 @@ Server::Start() {
signal(SIGTERM, SignalUtil::HandleSignal); signal(SIGTERM, SignalUtil::HandleSignal);
server::Metrics::GetInstance().Init(); server::Metrics::GetInstance().Init();
server::SystemInfo::GetInstance().Init(); server::SystemInfo::GetInstance().Init();
engine::SchedServInit();
std::cout << "Milvus server start successfully." << std::endl; std::cout << "Milvus server start successfully." << std::endl;
StartService(); StartService();
@ -221,12 +222,16 @@ Server::LoadConfig() {
void void
Server::StartService() { Server::StartService() {
engine::StartSchedulerService();
DBWrapper::GetInstance().StartService();
grpc::GrpcMilvusServer::StartService(); grpc::GrpcMilvusServer::StartService();
} }
void void
Server::StopService() { Server::StopService() {
grpc::GrpcMilvusServer::StopService(); grpc::GrpcMilvusServer::StopService();
DBWrapper::GetInstance().StopService();
engine::StopSchedulerService();
} }
} }

View File

@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() {
faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20); faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20);
DBWrapper::DB();//initialize db
std::string server_address(address + ":" + std::to_string(port)); std::string server_address(address + ":" + std::to_string(port));
::grpc::ServerBuilder builder; ::grpc::ServerBuilder builder;

View File

@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish(); WaitToFinish();
} }
ServerError ServerError GrpcBaseTask::Execute() {
GrpcBaseTask::Execute() {
error_code_ = OnExecute(); error_code_ = OnExecute();
done_ = true; Done();
finish_cond_.notify_all();
return error_code_; return error_code_;
} }
ServerError void GrpcBaseTask::Done() {
GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { done_ = true;
finish_cond_.notify_all();
}
ServerError GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
error_code_ = error_code; error_code_ = error_code;
error_msg_ = error_msg; error_msg_ = error_msg;
@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
return error_code_; return error_code_;
} }
ServerError ServerError GrpcBaseTask::WaitToFinish() {
GrpcBaseTask::WaitToFinish() {
std::unique_lock<std::mutex> lock(finish_mtx_); std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; }); finish_cond_.wait(lock, [this] { return done_; });
@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
Stop(); Stop();
} }
void void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
if (task_ptr == nullptr) { if (task_ptr == nullptr) {
return; return;
} }
@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr
} }
} }
void void GrpcRequestScheduler::Start() {
GrpcRequestScheduler::Start() {
if (!stopped_) { if (!stopped_) {
return; return;
} }
@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() {
stopped_ = false; stopped_ = false;
} }
void void GrpcRequestScheduler::Stop() {
GrpcRequestScheduler::Stop() {
if (stopped_) { if (stopped_) {
return; return;
} }
@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO << "Scheduler stopped"; SERVER_LOG_INFO << "Scheduler stopped";
} }
ServerError ServerError GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
if (task_ptr == nullptr) { if (task_ptr == nullptr) {
return SERVER_NULL_POINTER; return SERVER_NULL_POINTER;
} }
@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
return task_ptr->WaitToFinish();//sync execution return task_ptr->WaitToFinish();//sync execution
} }
namespace {
void TakeTaskToExecute(TaskQueuePtr task_queue) { void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) { if (task_queue == nullptr) {
return; return;
}
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
break;//stop the thread
} }
while (true) { try {
BaseTaskPtr task = task_queue->Take(); ServerError err = task->Execute();
if (task == nullptr) { if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread"; SERVER_LOG_ERROR << "Task failed with code: " << err;
break;//stop the thread
}
try {
ServerError err = task->Execute();
if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Task failed with code: " << err;
}
} catch (std::exception &ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
} }
} catch (std::exception &ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
} }
} }
} }
ServerError ServerError GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_); std::lock_guard<std::mutex> lock(queue_mtx_);
std::string group_name = task_ptr->TaskGroup(); std::string group_name = task_ptr->TaskGroup();
@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
task_groups_.insert(std::make_pair(group_name, queue)); task_groups_.insert(std::make_pair(group_name, queue));
//start a thread //start a thread
ThreadPtr thread = std::make_shared<std::thread>(&TakeTaskToExecute, queue); ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
execute_threads_.push_back(thread); execute_threads_.push_back(thread);
SERVER_LOG_INFO << "Create new thread for task group: " << group_name; SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
} }

View File

@ -25,30 +25,24 @@ protected:
virtual ~GrpcBaseTask(); virtual ~GrpcBaseTask();
public: public:
ServerError ServerError Execute();
Execute();
ServerError void Done();
WaitToFinish();
std::string ServerError WaitToFinish();
TaskGroup() const { return task_group_; }
ServerError std::string TaskGroup() const { return task_group_; }
ErrorCode() const { return error_code_; }
std::string ServerError ErrorCode() const { return error_code_; }
ErrorMsg() const { return error_msg_; }
bool std::string ErrorMsg() const { return error_msg_; }
IsAsync() const { return async_; }
bool IsAsync() const { return async_; }
protected: protected:
virtual ServerError virtual ServerError OnExecute() = 0;
OnExecute() = 0;
ServerError ServerError SetError(ServerError error_code, const std::string &msg);
SetError(ServerError error_code, const std::string &msg);
protected: protected:
mutable std::mutex finish_mtx_; mutable std::mutex finish_mtx_;
@ -77,19 +71,18 @@ public:
void Stop(); void Stop();
ServerError ServerError ExecuteTask(const BaseTaskPtr &task_ptr);
ExecuteTask(const BaseTaskPtr &task_ptr);
static void static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
protected: protected:
GrpcRequestScheduler(); GrpcRequestScheduler();
virtual ~GrpcRequestScheduler(); virtual ~GrpcRequestScheduler();
ServerError void TakeTaskToExecute(TaskQueuePtr task_queue);
PutTaskToQueue(const BaseTaskPtr &task_ptr);
ServerError PutTaskToQueue(const BaseTaskPtr &task_ptr);
private: private:
mutable std::mutex queue_mtx_; mutable std::mutex queue_mtx_;

View File

@ -457,21 +457,17 @@ InsertTask::OnExecute() {
} }
} }
//step 3: check table flag
//all user provide id, or all internal id //all user provide id, or all internal id
uint64_t row_count = 0;
DBWrapper::DB()->GetTableRowCount(table_info.table_id_, row_count);
bool empty_table = (row_count == 0);
bool user_provide_ids = !insert_param_->row_id_array().empty(); bool user_provide_ids = !insert_param_->row_id_array().empty();
if(!empty_table) { //user already provided id before, all insert action require user id
//user already provided id before, all insert action require user id if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) {
if(engine::utils::UserDefinedId(table_info.flag_) && !user_provide_ids) { return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch");
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch"); }
}
//user didn't provided id before, no need to provide user id //user didn't provided id before, no need to provide user id
if(!engine::utils::UserDefinedId(table_info.flag_) && user_provide_ids) { if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch"); return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch");
}
} }
rc.RecordSection("check validation"); rc.RecordSection("check validation");
@ -482,7 +478,7 @@ InsertTask::OnExecute() {
ProfilerStart(fname.c_str()); ProfilerStart(fname.c_str());
#endif #endif
//step 3: prepare float data //step 4: prepare float data
std::vector<float> vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0); std::vector<float> vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0);
// TODO: change to one dimension array in protobuf or use multiple-thread to copy the data // TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
@ -505,7 +501,7 @@ InsertTask::OnExecute() {
rc.ElapseFromBegin("prepare vectors data"); rc.ElapseFromBegin("prepare vectors data");
//step 4: insert vectors //step 5: insert vectors
auto vec_count = (uint64_t) insert_param_->row_record_array_size(); auto vec_count = (uint64_t) insert_param_->row_record_array_size();
std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0); std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0);
if(!insert_param_->row_id_array().empty()) { if(!insert_param_->row_id_array().empty()) {
@ -530,11 +526,10 @@ InsertTask::OnExecute() {
return SetError(SERVER_ILLEGAL_VECTOR_ID, msg); return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
} }
//step 5: update table flag //step 6: update table flag
if(empty_table && user_provide_ids) { user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
table_info.flag_ | engine::meta::FLAG_MASK_USERID); stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_);
}
#ifdef MILVUS_ENABLE_PROFILING #ifdef MILVUS_ENABLE_PROFILING
ProfilerStop(); ProfilerStop();