mirror of https://github.com/milvus-io/milvus.git
avoid potential multi-threads risk
Former-commit-id: 6e54134e4249003ce651eaf8ec2642d8898d34f0pull/191/head
parent
224fae759c
commit
bf8e7545e1
|
@ -30,6 +30,8 @@ RocksIdMapper::~RocksIdMapper() {
|
|||
}
|
||||
|
||||
void RocksIdMapper::OpenDb() {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
if(db_) {
|
||||
return;
|
||||
}
|
||||
|
@ -81,6 +83,8 @@ void RocksIdMapper::OpenDb() {
|
|||
}
|
||||
|
||||
void RocksIdMapper::CloseDb() {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
for(auto& iter : column_handles_) {
|
||||
delete iter.second;
|
||||
}
|
||||
|
@ -92,9 +96,86 @@ void RocksIdMapper::CloseDb() {
|
|||
}
|
||||
}
|
||||
|
||||
//not thread-safe
|
||||
ServerError RocksIdMapper::AddGroup(const std::string& group) {
|
||||
if(!IsGroupExist(group)) {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
return AddGroupInternal(group);
|
||||
}
|
||||
|
||||
bool RocksIdMapper::IsGroupExist(const std::string& group) const {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
return IsGroupExistInternal(group);
|
||||
}
|
||||
|
||||
|
||||
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
return PutInternal(nid, sid, group);
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
|
||||
if(nid.size() != sid.size()) {
|
||||
return SERVER_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
ServerError err = SERVER_SUCCESS;
|
||||
for(size_t i = 0; i < nid.size(); i++) {
|
||||
err = PutInternal(nid[i], sid[i], group);
|
||||
if(err != SERVER_SUCCESS) {
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
return GetInternal(nid, sid, group);
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
|
||||
sid.clear();
|
||||
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
ServerError err = SERVER_SUCCESS;
|
||||
for(size_t i = 0; i < nid.size(); i++) {
|
||||
std::string str_id;
|
||||
ServerError temp_err = GetInternal(nid[i], str_id, group);
|
||||
if(temp_err != SERVER_SUCCESS) {
|
||||
sid.push_back("");
|
||||
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
|
||||
err = temp_err;
|
||||
continue;
|
||||
}
|
||||
|
||||
sid.push_back(str_id);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
return DeleteInternal(nid, group);
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
|
||||
std::lock_guard<std::mutex> lck(db_mutex_);
|
||||
|
||||
return DeleteGroupInternal(group);
|
||||
}
|
||||
|
||||
//internal methods(whitout lock)
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
ServerError RocksIdMapper::AddGroupInternal(const std::string& group) {
|
||||
if(!IsGroupExistInternal(group)) {
|
||||
if(db_ == nullptr) {
|
||||
return SERVER_NULL_POINTER;
|
||||
}
|
||||
|
@ -117,8 +198,7 @@ ServerError RocksIdMapper::AddGroup(const std::string& group) {
|
|||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
//not thread-safe
|
||||
bool RocksIdMapper::IsGroupExist(const std::string& group) const {
|
||||
bool RocksIdMapper::IsGroupExistInternal(const std::string& group) const {
|
||||
std::string group_name = group;
|
||||
if(group_name.empty()){
|
||||
group_name = ROCKSDB_DEFAULT_GROUP;
|
||||
|
@ -126,7 +206,7 @@ bool RocksIdMapper::IsGroupExist(const std::string& group) const {
|
|||
return (column_handles_.count(group_name) > 0 && column_handles_[group_name] != nullptr);
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
|
||||
ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string& sid, const std::string& group) {
|
||||
if(db_ == nullptr) {
|
||||
return SERVER_NULL_POINTER;
|
||||
}
|
||||
|
@ -141,7 +221,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c
|
|||
}
|
||||
} else {
|
||||
//try create group
|
||||
if(AddGroup(group) != SERVER_SUCCESS){
|
||||
if(AddGroupInternal(group) != SERVER_SUCCESS){
|
||||
return SERVER_UNEXPECTED_ERROR;
|
||||
}
|
||||
|
||||
|
@ -156,23 +236,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c
|
|||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
|
||||
if(nid.size() != sid.size()) {
|
||||
return SERVER_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
ServerError err = SERVER_SUCCESS;
|
||||
for(size_t i = 0; i < nid.size(); i++) {
|
||||
err = Put(nid[i], sid[i], group);
|
||||
if(err != SERVER_SUCCESS) {
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
|
||||
ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid, const std::string& group) const {
|
||||
sid = "";
|
||||
if(db_ == nullptr) {
|
||||
return SERVER_NULL_POINTER;
|
||||
|
@ -199,28 +263,8 @@ ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const s
|
|||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
|
||||
sid.clear();
|
||||
|
||||
ServerError err = SERVER_SUCCESS;
|
||||
for(size_t i = 0; i < nid.size(); i++) {
|
||||
std::string str_id;
|
||||
ServerError temp_err = Get(nid[i], str_id, group);
|
||||
if(temp_err != SERVER_SUCCESS) {
|
||||
sid.push_back("");
|
||||
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
|
||||
err = temp_err;
|
||||
continue;
|
||||
}
|
||||
|
||||
sid.push_back(str_id);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
|
||||
if(db_ == nullptr) {
|
||||
ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::string& group) {
|
||||
if(db_ == nullptr) {
|
||||
return SERVER_NULL_POINTER;
|
||||
}
|
||||
|
||||
|
@ -244,7 +288,7 @@ ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& gro
|
|||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
|
||||
ServerError RocksIdMapper::DeleteGroupInternal(const std::string& group) {
|
||||
if(db_ == nullptr) {
|
||||
return SERVER_NULL_POINTER;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
#include <mutex>
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
|
@ -39,9 +40,22 @@ private:
|
|||
void OpenDb();
|
||||
void CloseDb();
|
||||
|
||||
ServerError AddGroupInternal(const std::string& group);
|
||||
|
||||
bool IsGroupExistInternal(const std::string& group) const;
|
||||
|
||||
ServerError PutInternal(const std::string& nid, const std::string& sid, const std::string& group);
|
||||
|
||||
ServerError GetInternal(const std::string& nid, std::string& sid, const std::string& group) const;
|
||||
|
||||
ServerError DeleteInternal(const std::string& nid, const std::string& group);
|
||||
|
||||
ServerError DeleteGroupInternal(const std::string& group);
|
||||
|
||||
private:
|
||||
rocksdb::DB* db_;
|
||||
mutable std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
|
||||
mutable std::mutex db_mutex_;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue