refactor(db): whole db api refactor

Former-commit-id: 3519e8a464bcd66b6da8ef2c579c08eaf13d17bd
pull/191/head
Xu Peng 2019-05-28 13:49:59 +08:00
parent 130b710169
commit c5501202ad
7 changed files with 124 additions and 124 deletions

View File

@ -21,22 +21,22 @@ class DB {
public:
static void Open(const Options& options, DB** dbptr);
virtual Status add_group(meta::TableSchema& table_schema_) = 0;
virtual Status get_group(meta::TableSchema& table_schema_) = 0;
virtual Status has_group(const std::string& table_id_, bool& has_or_not_) = 0;
virtual Status CreateTable(meta::TableSchema& table_schema_) = 0;
virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0;
virtual Status HasTable(const std::string& table_id_, bool& has_or_not_) = 0;
virtual Status add_vectors(const std::string& table_id_,
virtual Status InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
virtual Status search(const std::string& table_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, QueryResults& results) = 0;
virtual Status search(const std::string& table_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0;
virtual Status size(long& result) = 0;
virtual Status Size(long& result) = 0;
virtual Status drop_all() = 0;
virtual Status DropAll() = 0;
DB() = default;
DB(const DB&) = delete;

View File

@ -33,22 +33,22 @@ public:
DBImpl(const Options& options);
virtual Status add_group(meta::TableSchema& table_schema) override;
virtual Status get_group(meta::TableSchema& table_schema) override;
virtual Status has_group(const std::string& table_id, bool& has_or_not) override;
virtual Status CreateTable(meta::TableSchema& table_schema) override;
virtual Status DescribeTable(meta::TableSchema& table_schema) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status add_vectors(const std::string& table_id,
virtual Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids) override;
virtual Status search(const std::string& table_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, QueryResults& results) override;
virtual Status search(const std::string& table_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) override;
virtual Status drop_all() override;
virtual Status DropAll() override;
virtual Status size(long& result) override;
virtual Status Size(long& result) override;
virtual ~DBImpl();

View File

@ -34,38 +34,38 @@ DBImpl<EngineT>::DBImpl(const Options& options)
}
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::TableSchema& table_schema) {
Status DBImpl<EngineT>::CreateTable(meta::TableSchema& table_schema) {
return pMeta_->CreateTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::TableSchema& table_schema) {
Status DBImpl<EngineT>::DescribeTable(meta::TableSchema& table_schema) {
return pMeta_->DescribeTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& table_id, bool& has_or_not) {
Status DBImpl<EngineT>::HasTable(const std::string& table_id, bool& has_or_not) {
return pMeta_->HasTable(table_id, has_or_not);
}
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& table_id_,
Status DBImpl<EngineT>::InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
Status status = pMemMgr_->add_vectors(table_id_, n, vectors, vector_ids_);
Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
if (!status.ok()) {
return status;
}
}
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string &table_id, size_t k, size_t nq,
Status DBImpl<EngineT>::Query(const std::string &table_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
meta::DatesT dates = {meta::Meta::GetDate()};
return search(table_id, k, nq, vectors, dates, results);
return Query(table_id, k, nq, vectors, dates, results);
}
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string& table_id, size_t k, size_t nq,
Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedTableFilesSchema files;
@ -384,7 +384,7 @@ Status DBImpl<EngineT>::TryBuildIndex() {
template<typename EngineT>
void DBImpl<EngineT>::BackgroundCompaction() {
std::vector<std::string> table_ids;
pMemMgr_->serialize(table_ids);
pMemMgr_->Serialize(table_ids);
Status status;
for (auto table_id : table_ids) {
@ -397,12 +397,12 @@ void DBImpl<EngineT>::BackgroundCompaction() {
}
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
Status DBImpl<EngineT>::DropAll() {
return pMeta_->DropAll();
}
template<typename EngineT>
Status DBImpl<EngineT>::size(long& result) {
Status DBImpl<EngineT>::Size(long& result) {
return pMeta_->Size(result);
}
@ -423,7 +423,7 @@ DBImpl<EngineT>::~DBImpl() {
}
bg_timer_thread_.join();
std::vector<std::string> ids;
pMemMgr_->serialize(ids);
pMemMgr_->Serialize(ids);
env_->Stop();
}

View File

@ -26,24 +26,24 @@ namespace meta {
template <typename EngineT>
class MemVectors {
public:
typedef typename EngineT::Ptr EnginePtr;
typedef typename meta::Meta::Ptr MetaPtr;
typedef std::shared_ptr<MemVectors<EngineT>> Ptr;
using EnginePtr = typename EngineT::Ptr;
using MetaPtr = meta::Meta::Ptr;
using Ptr = std::shared_ptr<MemVectors<EngineT>>;
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
const meta::TableFileSchema&, const Options&);
void add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
void Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
size_t total() const;
size_t Total() const;
size_t approximate_size() const;
size_t ApproximateSize() const;
Status serialize(std::string& table_id);
Status Serialize(std::string& table_id);
~MemVectors();
const std::string& location() const { return schema_.location; }
const std::string& Location() const { return schema_.location; }
private:
MemVectors() = delete;
@ -53,7 +53,7 @@ private:
MetaPtr pMeta_;
Options options_;
meta::TableFileSchema schema_;
IDGenerator* _pIdGenerator;
IDGenerator* pIdGenerator_;
EnginePtr pEE_;
}; // MemVectors
@ -63,32 +63,32 @@ private:
template<typename EngineT>
class MemManager {
public:
typedef typename meta::Meta::Ptr MetaPtr;
typedef typename MemVectors<EngineT>::Ptr MemVectorsPtr;
typedef std::shared_ptr<MemManager<EngineT>> Ptr;
using MetaPtr = meta::Meta::Ptr;
using MemVectorsPtr = typename MemVectors<EngineT>::Ptr;
using Ptr = std::shared_ptr<MemManager<EngineT>>;
MemManager(const std::shared_ptr<meta::Meta>& meta_, const Options& options)
: _pMeta(meta_), options_(options) {}
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
: pMeta_(meta), options_(options) {}
MemVectorsPtr get_mem_by_group(const std::string& table_id_);
MemVectorsPtr GetMemByTable(const std::string& table_id);
Status add_vectors(const std::string& table_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status serialize(std::vector<std::string>& table_ids);
Status Serialize(std::vector<std::string>& table_ids);
private:
Status add_vectors_no_lock(const std::string& table_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status mark_memory_as_immutable();
Status InsertVectorsNoLock(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status ToImmutable();
typedef std::map<std::string, MemVectorsPtr> MemMap;
typedef std::vector<MemVectorsPtr> ImmMemPool;
MemMap _memMap;
ImmMemPool _immMems;
MetaPtr _pMeta;
using MemMap = std::map<std::string, MemVectorsPtr>;
using ImmMemPool = std::vector<MemVectorsPtr>;
MemMap memMap_;
ImmMemPool immMems_;
MetaPtr pMeta_;
Options options_;
std::mutex _mutex;
std::mutex mutex_;
std::mutex serialization_mtx_;
}; // MemManager

View File

@ -24,30 +24,30 @@ MemVectors<EngineT>::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
: pMeta_(meta_ptr),
options_(options),
schema_(schema),
_pIdGenerator(new SimpleIDGenerator()),
pIdGenerator_(new SimpleIDGenerator()),
pEE_(new EngineT(schema_.dimension, schema_.location)) {
}
template<typename EngineT>
void MemVectors<EngineT>::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
_pIdGenerator->GetNextIDNumbers(n_, vector_ids_);
void MemVectors<EngineT>::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
pIdGenerator_->GetNextIDNumbers(n_, vector_ids_);
pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
}
template<typename EngineT>
size_t MemVectors<EngineT>::total() const {
size_t MemVectors<EngineT>::Total() const {
return pEE_->Count();
}
template<typename EngineT>
size_t MemVectors<EngineT>::approximate_size() const {
size_t MemVectors<EngineT>::ApproximateSize() const {
return pEE_->Size();
}
template<typename EngineT>
Status MemVectors<EngineT>::serialize(std::string& table_id) {
Status MemVectors<EngineT>::Serialize(std::string& table_id) {
table_id = schema_.table_id;
auto size = approximate_size();
auto size = ApproximateSize();
pEE_->Serialize();
schema_.size = size;
schema_.file_type = (size >= options_.index_trigger_size) ?
@ -65,9 +65,9 @@ Status MemVectors<EngineT>::serialize(std::string& table_id) {
template<typename EngineT>
MemVectors<EngineT>::~MemVectors() {
if (_pIdGenerator != nullptr) {
delete _pIdGenerator;
_pIdGenerator = nullptr;
if (pIdGenerator_ != nullptr) {
delete pIdGenerator_;
pIdGenerator_ = nullptr;
}
}
@ -76,69 +76,69 @@ MemVectors<EngineT>::~MemVectors() {
*/
template<typename EngineT>
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::get_mem_by_group(
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::GetMemByTable(
const std::string& table_id) {
auto memIt = _memMap.find(table_id);
if (memIt != _memMap.end()) {
auto memIt = memMap_.find(table_id);
if (memIt != memMap_.end()) {
return memIt->second;
}
meta::TableFileSchema table_file;
table_file.table_id = table_id;
auto status = _pMeta->CreateTableFile(table_file);
auto status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
return nullptr;
}
_memMap[table_id] = MemVectorsPtr(new MemVectors<EngineT>(_pMeta, table_file, options_));
return _memMap[table_id];
memMap_[table_id] = MemVectorsPtr(new MemVectors<EngineT>(pMeta_, table_file, options_));
return memMap_[table_id];
}
template<typename EngineT>
Status MemManager<EngineT>::add_vectors(const std::string& table_id_,
Status MemManager<EngineT>::InsertVectors(const std::string& table_id_,
size_t n_,
const float* vectors_,
IDNumbers& vector_ids_) {
std::unique_lock<std::mutex> lock(_mutex);
return add_vectors_no_lock(table_id_, n_, vectors_, vector_ids_);
std::unique_lock<std::mutex> lock(mutex_);
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}
template<typename EngineT>
Status MemManager<EngineT>::add_vectors_no_lock(const std::string& table_id,
Status MemManager<EngineT>::InsertVectorsNoLock(const std::string& table_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
MemVectorsPtr mem = get_mem_by_group(table_id);
MemVectorsPtr mem = GetMemByTable(table_id);
if (mem == nullptr) {
return Status::NotFound("Group " + table_id + " not found!");
}
mem->add(n, vectors, vector_ids);
mem->Add(n, vectors, vector_ids);
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::mark_memory_as_immutable() {
std::unique_lock<std::mutex> lock(_mutex);
for (auto& kv: _memMap) {
_immMems.push_back(kv.second);
Status MemManager<EngineT>::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
for (auto& kv: memMap_) {
immMems_.push_back(kv.second);
}
_memMap.clear();
memMap_.clear();
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::serialize(std::vector<std::string>& table_ids) {
mark_memory_as_immutable();
Status MemManager<EngineT>::Serialize(std::vector<std::string>& table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string table_id;
table_ids.clear();
for (auto& mem : _immMems) {
mem->serialize(table_id);
for (auto& mem : immMems_) {
mem->Serialize(table_id);
table_ids.push_back(table_id);
}
_immMems.clear();
immMems_.clear();
return Status::OK();
}

View File

@ -78,17 +78,17 @@ BaseTaskPtr CreateTableTask::Create(const thrift::TableSchema& schema) {
ServerError CreateTableTask::OnExecute() {
TimeRecorder rc("CreateTableTask");
try {
if(schema_.vector_column_array.empty()) {
return SERVER_INVALID_ARGUMENT;
}
IVecIdMapper::GetInstance()->AddGroup(schema_.table_name);
engine::meta::GroupSchema group_info;
group_info.dimension = (uint16_t)schema_.vector_column_array[0].dimension;
group_info.group_id = schema_.table_name;
engine::Status stat = DB()->add_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.dimension = (uint16_t)schema_.vector_column_array[0].dimension;
table_schema.table_id = schema_.table_name;
engine::Status stat = DB()->CreateTable(table_schema);
if(!stat.ok()) {//could exist
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
@ -123,9 +123,9 @@ ServerError DescribeTableTask::OnExecute() {
TimeRecorder rc("DescribeTableTask");
try {
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -154,8 +154,8 @@ DeleteTableTask::DeleteTableTask(const std::string& table_name)
}
BaseTaskPtr DeleteTableTask::Create(const std::string& group_id) {
return std::shared_ptr<BaseTask>(new DeleteTableTask(group_id));
BaseTaskPtr DeleteTableTask::Create(const std::string& table_id) {
return std::shared_ptr<BaseTask>(new DeleteTableTask(table_id));
}
ServerError DeleteTableTask::OnExecute() {
@ -195,9 +195,9 @@ ServerError AddVectorTask::OnExecute() {
return SERVER_SUCCESS;
}
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -208,7 +208,7 @@ ServerError AddVectorTask::OnExecute() {
rc.Record("get group info");
uint64_t vec_count = (uint64_t)record_array_.size();
uint64_t group_dim = group_info.dimension;
uint64_t group_dim = table_schema.dimension;
std::vector<float> vec_f;
vec_f.resize(vec_count*group_dim);//allocate enough memory
for(uint64_t i = 0; i < vec_count; i++) {
@ -236,7 +236,7 @@ ServerError AddVectorTask::OnExecute() {
rc.Record("prepare vectors data");
stat = DB()->add_vectors(table_name_, vec_count, vec_f.data(), record_ids_);
stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
rc.Record("add vectors to engine");
if(!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
@ -293,9 +293,9 @@ ServerError SearchVectorTask::OnExecute() {
return error_code_;
}
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -305,7 +305,7 @@ ServerError SearchVectorTask::OnExecute() {
std::vector<float> vec_f;
uint64_t record_count = (uint64_t)record_array_.size();
vec_f.resize(record_count*group_info.dimension);
vec_f.resize(record_count*table_schema.dimension);
for(uint64_t i = 0; i < record_array_.size(); i++) {
const auto& record = record_array_[i];
@ -317,9 +317,9 @@ ServerError SearchVectorTask::OnExecute() {
}
uint64_t vec_dim = record.vector_map.begin()->second.size() / sizeof(double);//how many double value?
if (vec_dim != group_info.dimension) {
if (vec_dim != table_schema.dimension) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_info.dimension;
<< " vs. group dimension:" << table_schema.dimension;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
@ -335,7 +335,7 @@ ServerError SearchVectorTask::OnExecute() {
std::vector<DB_DATE> dates;
engine::QueryResults results;
stat = DB()->search(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
stat = DB()->Query(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;

View File

@ -69,18 +69,18 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.table_id = group_name;
engine::Status stat = db_->add_group(group_info);
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->get_group(group_info_get);
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
db_->size(size);
db_->Size(size);
int d = 256;
int nb = 20;
float *xb = new float[d * nb];
@ -92,13 +92,13 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
int loop = 100000;
for (auto i=0; i<loop; ++i) {
db_->add_vectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(group_name, nb, xb, vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->size(size);
db_->Size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_TRUE(size < 1 * engine::meta::G);
@ -114,11 +114,11 @@ TEST_F(DBTest, DB_TEST) {
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.table_id = group_name;
engine::Status stat = db_->add_group(group_info);
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->get_group(group_info_get);
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
@ -152,11 +152,11 @@ TEST_F(DBTest, DB_TEST) {
for (auto j=0; j<10; ++j) {
ss.str("");
db_->size(count);
db_->Size(count);
prev_count = count;
START_TIMER;
stat = db_->search(group_name, k, qb, qxb, results);
stat = db_->Query(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
@ -179,10 +179,10 @@ TEST_F(DBTest, DB_TEST) {
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->add_vectors(group_name, qb, qxb, target_ids);
db_->InsertVectors(group_name, qb, qxb, target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->add_vectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(group_name, nb, xb, vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
@ -200,11 +200,11 @@ TEST_F(DBTest, SEARCH_TEST) {
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.table_id = group_name;
engine::Status stat = db_->add_group(group_info);
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->get_group(group_info_get);
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
@ -238,7 +238,7 @@ TEST_F(DBTest, SEARCH_TEST) {
// insert data
const int batch_size = 100;
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->add_vectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids);
stat = db_->InsertVectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids);
if (j == 200){ sleep(1);}
ASSERT_STATS(stat);
}
@ -246,7 +246,7 @@ TEST_F(DBTest, SEARCH_TEST) {
sleep(2); // wait until build index finish
engine::QueryResults results;
stat = db_->search(group_name, k, nq, xq.data(), results);
stat = db_->Query(group_name, k, nq, xq.data(), results);
ASSERT_STATS(stat);
// TODO(linxj): add groundTruth assert