feat(db): add some meta data logic and update db bg merge jobs

Former-commit-id: 6f899d8efdf831b286e1af8a9e6490dde1acedc0
pull/191/head
Xu Peng 2019-04-16 14:45:18 +08:00
parent 83cff9fac0
commit 846a05459e
9 changed files with 148 additions and 30 deletions

View File

@ -19,12 +19,12 @@ public:
virtual Status add_group(const GroupOptions& options_, virtual Status add_group(const GroupOptions& options_,
const std::string& group_id_, const std::string& group_id_,
GroupSchema& group_info_) = 0; meta::GroupSchema& group_info_) = 0;
virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) = 0; virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status get_group_files(const std::string& group_id_, virtual Status get_group_files(const std::string& group_id_,
const int date_delta_, const int date_delta_,
GroupFilesSchema& group_files_info_) = 0; meta::GroupFilesSchema& group_files_info_) = 0;
virtual Status add_vectors(const std::string& group_id_, virtual Status add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) = 0; size_t n, const float* vectors, IDNumbers& vector_ids_) = 0;

View File

@ -2,6 +2,9 @@
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#include <faiss/IndexFlat.h>
#include <faiss/MetaIndexes.h>
#include <faiss/index_io.h>
#include "db_impl.h" #include "db_impl.h"
#include "db_meta_impl.h" #include "db_meta_impl.h"
#include "env.h" #include "env.h"
@ -16,21 +19,21 @@ DBImpl::DBImpl(const Options& options_, const std::string& name_)
_options(options_), _options(options_),
_bg_compaction_scheduled(false), _bg_compaction_scheduled(false),
_shutting_down(false), _shutting_down(false),
_pMeta(new DBMetaImpl(*(_options.pMetaOptions))), _pMeta(new meta::DBMetaImpl(*(_options.pMetaOptions))),
_pMemMgr(new MemManager(_pMeta)) { _pMemMgr(new MemManager(_pMeta)) {
start_timer_task(options_.memory_sync_interval); start_timer_task(options_.memory_sync_interval);
} }
Status DBImpl::add_group(const GroupOptions& options, Status DBImpl::add_group(const GroupOptions& options,
const std::string& group_id, const std::string& group_id,
GroupSchema& group_info) { meta::GroupSchema& group_info) {
assert((!options.has_id) || assert((!options.has_id) ||
(options.has_id && ("" != group_id))); (options.has_id && ("" != group_id)));
return _pMeta->add_group(options, group_id, group_info); return _pMeta->add_group(options, group_id, group_info);
} }
Status DBImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) { Status DBImpl::get_group(const std::string& group_id_, meta::GroupSchema& group_info_) {
return _pMeta->get_group(group_id_, group_info_); return _pMeta->get_group(group_id_, group_info_);
} }
@ -40,7 +43,7 @@ Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
Status DBImpl::get_group_files(const std::string& group_id, Status DBImpl::get_group_files(const std::string& group_id,
const int date_delta, const int date_delta,
GroupFilesSchema& group_files_info) { meta::GroupFilesSchema& group_files_info) {
return _pMeta->get_group_files(group_id, date_delta, group_files_info); return _pMeta->get_group_files(group_id, date_delta, group_files_info);
} }
@ -100,12 +103,64 @@ void DBImpl::background_call() {
_bg_work_finish_signal.notify_all(); _bg_work_finish_signal.notify_all();
} }
Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
const meta::GroupFilesSchema& files) {
meta::GroupFileSchema group_file;
Status status = _pMeta->add_group_file(group_id, date, group_file);
if (!status.ok()) {
return status;
}
faiss::IndexFlat innerIndex(group_file.dimension);
faiss::IndexIDMap index(&innerIndex);
meta::GroupFilesSchema updated;
for (auto& file : files) {
auto file_index = dynamic_cast<faiss::IndexIDMap*>(faiss::read_index(file.location.c_str()));
index.add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
file_index->id_map.data());
auto file_schema = file;
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
updated.push_back(file_schema);
}
faiss::write_index(&index, group_file.location.c_str());
group_file.file_type = meta::GroupFileSchema::RAW;
updated.push_back(group_file);
status = _pMeta->update_files(updated);
return status;
}
Status DBImpl::background_merge_files(const std::string& group_id) {
meta::DatePartionedGroupFilesSchema raw_files;
/* auto status = _pMeta->get_small_raw_files(group_id, raw_files); */
/* if (!status.ok()) { */
/* _bg_error = status; */
/* return status; */
/* } */
if (raw_files.size() == 0) {
return Status::OK();
}
for (auto& kv : raw_files) {
merge_files(group_id, kv.first, kv.second);
}
}
void DBImpl::background_compaction() { void DBImpl::background_compaction() {
std::vector<std::string> group_ids; std::vector<std::string> group_ids;
_pMemMgr->serialize(group_ids); _pMemMgr->serialize(group_ids);
for (auto group_id : group_ids) { for (auto group_id : group_ids) {
std::cout << __func__ << " group_id=" << group_id << std::endl; std::cout << __func__ << " group_id=" << group_id << std::endl;
} }
if (group_ids.size() > 0) {
}
} }
DBImpl::~DBImpl() { DBImpl::~DBImpl() {

View File

@ -14,19 +14,23 @@ namespace engine {
class Env; class Env;
namespace meta {
class Meta;
}
class DBImpl : public DB { class DBImpl : public DB {
public: public:
DBImpl(const Options& options_, const std::string& name_); DBImpl(const Options& options_, const std::string& name_);
virtual Status add_group(const GroupOptions& options_, virtual Status add_group(const GroupOptions& options_,
const std::string& group_id_, const std::string& group_id_,
GroupSchema& group_info_) override; meta::GroupSchema& group_info_) override;
virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override; virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status get_group_files(const std::string& group_id_, virtual Status get_group_files(const std::string& group_id_,
const int date_delta_, const int date_delta_,
GroupFilesSchema& group_files_info_) override; meta::GroupFilesSchema& group_files_info_) override;
virtual Status add_vectors(const std::string& group_id_, virtual Status add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) override; size_t n, const float* vectors, IDNumbers& vector_ids_) override;
@ -37,6 +41,10 @@ public:
virtual ~DBImpl(); virtual ~DBImpl();
private: private:
Status merge_files(const std::string& group_id,
const meta::DateT& date,
const meta::GroupFilesSchema& files);
Status background_merge_files(const std::string& group_id);
void try_schedule_compaction(); void try_schedule_compaction();
void start_timer_task(int interval_); void start_timer_task(int interval_);
@ -56,7 +64,7 @@ private:
Status _bg_error; Status _bg_error;
std::atomic<bool> _shutting_down; std::atomic<bool> _shutting_down;
std::shared_ptr<Meta> _pMeta; std::shared_ptr<meta::Meta> _pMeta;
std::shared_ptr<MemManager> _pMemMgr; std::shared_ptr<MemManager> _pMemMgr;
}; // DBImpl }; // DBImpl

View File

@ -1,10 +1,21 @@
#include <ctime>
#include "db_meta.h" #include "db_meta.h"
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
namespace engine { namespace engine {
namespace meta {
DateT Meta::GetDate(const std::time_t& t) {
tm *ltm = std::localtime(&t);
return ltm->tm_year*10000 + ltm->tm_mon*100 + ltm->tm_mday;
}
DateT Meta::GetDate() {
return GetDate(std::time(nullptr));
}
} // namespace meta
} // namespace engine } // namespace engine
} // namespace vecwise } // namespace vecwise
} // namespace zilliz } // namespace zilliz

View File

@ -3,12 +3,17 @@
#include <string> #include <string>
#include <cstddef> #include <cstddef>
#include <vector> #include <vector>
#include <map>
#include <ctime>
#include "options.h" #include "options.h"
#include "status.h" #include "status.h"
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
namespace engine { namespace engine {
namespace meta {
typedef int DateT;
struct GroupSchema { struct GroupSchema {
size_t id; size_t id;
@ -22,19 +27,24 @@ struct GroupSchema {
struct GroupFileSchema { struct GroupFileSchema {
typedef enum { typedef enum {
NEW,
RAW, RAW,
INDEX INDEX,
TO_DELETE,
} FILE_TYPE; } FILE_TYPE;
size_t id; size_t id;
std::string group_id; std::string group_id;
std::string file_id; std::string file_id;
int files_type = RAW; int file_type = NEW;
size_t rows; size_t rows;
DateT date;
uint16_t dimension;
std::string location = ""; std::string location = "";
}; // GroupFileSchema }; // GroupFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema; typedef std::vector<GroupFileSchema> GroupFilesSchema;
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
class Meta { class Meta {
@ -47,6 +57,9 @@ public:
virtual Status add_group_file(const std::string& group_id_, virtual Status add_group_file(const std::string& group_id_,
GroupFileSchema& group_file_info_) = 0; GroupFileSchema& group_file_info_) = 0;
virtual Status add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info) = 0;
virtual Status has_group_file(const std::string& group_id_, virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_, const std::string& file_id_,
bool& has_or_not_) = 0; bool& has_or_not_) = 0;
@ -59,8 +72,14 @@ public:
const int date_delta_, const int date_delta_,
GroupFilesSchema& group_files_info_) = 0; GroupFilesSchema& group_files_info_) = 0;
virtual Status update_files(const GroupFilesSchema& files) = 0;
static DateT GetDate(const std::time_t& t);
static DateT GetDate();
}; // MetaData }; // MetaData
} // namespace meta
} // namespace engine } // namespace engine
} // namespace vecwise } // namespace vecwise
} // namespace zilliz } // namespace zilliz

View File

@ -6,6 +6,7 @@
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
namespace engine { namespace engine {
namespace meta {
DBMetaImpl::DBMetaImpl(const MetaOptions& options_) DBMetaImpl::DBMetaImpl(const MetaOptions& options_)
: _options(static_cast<const DBMetaOptions&>(options_)) { : _options(static_cast<const DBMetaOptions&>(options_)) {
@ -26,13 +27,6 @@ Status DBMetaImpl::add_group(const GroupOptions& options_,
Status DBMetaImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) { Status DBMetaImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) {
//PXU TODO //PXU TODO
std::stringstream ss;
SimpleIDGenerator g;
ss.str("");
ss << "/tmp/test/" << g.getNextIDNumber() << ".log";
group_info_.group_id = "1";
group_info_.dimension = 64;
group_info_.next_file_location = ss.str();
return Status::OK(); return Status::OK();
} }
@ -41,9 +35,24 @@ Status DBMetaImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
return Status::OK(); return Status::OK();
} }
Status DBMetaImpl::add_group_file(const std::string& group_id_, Status DBMetaImpl::add_group_file(const std::string& group_id,
GroupFileSchema& group_file_info_) { GroupFileSchema& group_file_info) {
return add_group_file(group_id, Meta::GetDate(), group_file_info);
}
Status DBMetaImpl::add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info) {
//PXU TODO //PXU TODO
std::stringstream ss;
SimpleIDGenerator g;
ss << "/tmp/test/" << date
<< "/" << g.getNextIDNumber()
<< ".log";
group_file_info.group_id = "1";
group_file_info.dimension = 64;
group_file_info.location = ss.str();
group_file_info.date = date;
return Status::OK(); return Status::OK();
} }
@ -73,6 +82,12 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file_) {
return Status::OK(); return Status::OK();
} }
Status DBMetaImpl::update_files(const GroupFilesSchema& files) {
//PXU TODO
return Status::OK();
}
} // namespace meta
} // namespace engine } // namespace engine
} // namespace vecwise } // namespace vecwise
} // namespace zilliz } // namespace zilliz

View File

@ -7,6 +7,7 @@
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
namespace engine { namespace engine {
namespace meta {
class DBMetaImpl : public Meta { class DBMetaImpl : public Meta {
public: public:
@ -18,6 +19,9 @@ public:
virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override; virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(const std::string& group_id,
DateT date,
GroupFileSchema& group_file_info) override;
virtual Status add_group_file(const std::string& group_id_, virtual Status add_group_file(const std::string& group_id_,
GroupFileSchema& group_file_info_) override; GroupFileSchema& group_file_info_) override;
virtual Status has_group_file(const std::string& group_id_, virtual Status has_group_file(const std::string& group_id_,
@ -32,6 +36,8 @@ public:
const int date_delta_, const int date_delta_,
GroupFilesSchema& group_files_info_) override; GroupFilesSchema& group_files_info_) override;
virtual Status update_files(const GroupFilesSchema& files) override;
private: private:
Status initialize(); Status initialize();
@ -40,6 +46,7 @@ private:
}; // DBMetaImpl }; // DBMetaImpl
} // namespace meta
} // namespace engine } // namespace engine
} // namespace vecwise } // namespace vecwise
} // namespace zilliz } // namespace zilliz

View File

@ -75,15 +75,15 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) {
return memIt->second; return memIt->second;
} }
GroupSchema group_info; meta::GroupFileSchema group_file;
Status status = _pMeta->get_group(group_id, group_info); auto status = _pMeta->add_group_file(group_id, group_file);
if (!status.ok()) { if (!status.ok()) {
return nullptr; return nullptr;
} }
_memMap[group_id] = std::shared_ptr<MemVectors>(new MemVectors(group_info.group_id, _memMap[group_id] = std::shared_ptr<MemVectors>(new MemVectors(group_file.group_id,
group_info.dimension, group_file.dimension,
group_info.next_file_location)); group_file.location));
return _memMap[group_id]; return _memMap[group_id];
} }

View File

@ -19,6 +19,10 @@ namespace zilliz {
namespace vecwise { namespace vecwise {
namespace engine { namespace engine {
namespace meta {
class Meta;
}
class MemVectors { class MemVectors {
public: public:
explicit MemVectors(const std::string& group_id, explicit MemVectors(const std::string& group_id,
@ -52,12 +56,11 @@ private:
}; // MemVectors }; // MemVectors
class Meta;
typedef std::shared_ptr<MemVectors> VectorsPtr; typedef std::shared_ptr<MemVectors> VectorsPtr;
class MemManager { class MemManager {
public: public:
MemManager(const std::shared_ptr<Meta>& meta_) MemManager(const std::shared_ptr<meta::Meta>& meta_)
: _pMeta(meta_) /*_last_compact_time(std::time(nullptr))*/ {} : _pMeta(meta_) /*_last_compact_time(std::time(nullptr))*/ {}
VectorsPtr get_mem_by_group(const std::string& group_id_); VectorsPtr get_mem_by_group(const std::string& group_id_);
@ -76,7 +79,7 @@ private:
typedef std::vector<VectorsPtr> ImmMemPool; typedef std::vector<VectorsPtr> ImmMemPool;
MemMap _memMap; MemMap _memMap;
ImmMemPool _immMems; ImmMemPool _immMems;
std::shared_ptr<Meta> _pMeta; std::shared_ptr<meta::Meta> _pMeta;
/* std::time_t _last_compact_time; */ /* std::time_t _last_compact_time; */
std::mutex _mutex; std::mutex _mutex;
}; // MemManager }; // MemManager