diff --git a/cpp/src/db/db.h b/cpp/src/db/db.h index c14822b041..74dfe93edf 100644 --- a/cpp/src/db/db.h +++ b/cpp/src/db/db.h @@ -19,12 +19,12 @@ public: virtual Status add_group(const GroupOptions& options_, const std::string& group_id_, - GroupSchema& group_info_) = 0; - virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) = 0; + meta::GroupSchema& group_info_) = 0; + virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) = 0; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, - GroupFilesSchema& group_files_info_) = 0; + meta::GroupFilesSchema& group_files_info_) = 0; virtual Status add_vectors(const std::string& group_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) = 0; diff --git a/cpp/src/db/db_impl.cpp b/cpp/src/db/db_impl.cpp index 4c00d07e0a..723b80cb4a 100644 --- a/cpp/src/db/db_impl.cpp +++ b/cpp/src/db/db_impl.cpp @@ -2,6 +2,9 @@ #include #include #include +#include +#include +#include #include "db_impl.h" #include "db_meta_impl.h" #include "env.h" @@ -16,21 +19,21 @@ DBImpl::DBImpl(const Options& options_, const std::string& name_) _options(options_), _bg_compaction_scheduled(false), _shutting_down(false), - _pMeta(new DBMetaImpl(*(_options.pMetaOptions))), + _pMeta(new meta::DBMetaImpl(*(_options.pMetaOptions))), _pMemMgr(new MemManager(_pMeta)) { start_timer_task(options_.memory_sync_interval); } Status DBImpl::add_group(const GroupOptions& options, const std::string& group_id, - GroupSchema& group_info) { + meta::GroupSchema& group_info) { assert((!options.has_id) || (options.has_id && ("" != group_id))); return _pMeta->add_group(options, group_id, group_info); } -Status DBImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) { +Status DBImpl::get_group(const std::string& group_id_, meta::GroupSchema& group_info_) { return _pMeta->get_group(group_id_, group_info_); } @@ -40,7 +43,7 @@ Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) { Status DBImpl::get_group_files(const std::string& group_id, const int date_delta, - GroupFilesSchema& group_files_info) { + meta::GroupFilesSchema& group_files_info) { return _pMeta->get_group_files(group_id, date_delta, group_files_info); } @@ -100,12 +103,64 @@ void DBImpl::background_call() { _bg_work_finish_signal.notify_all(); } + +Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, + const meta::GroupFilesSchema& files) { + meta::GroupFileSchema group_file; + Status status = _pMeta->add_group_file(group_id, date, group_file); + if (!status.ok()) { + return status; + } + + faiss::IndexFlat innerIndex(group_file.dimension); + faiss::IndexIDMap index(&innerIndex); + + meta::GroupFilesSchema updated; + + for (auto& file : files) { + auto file_index = dynamic_cast(faiss::read_index(file.location.c_str())); + index.add_with_ids(file_index->ntotal, dynamic_cast(file_index->index)->xb.data(), + file_index->id_map.data()); + auto file_schema = file; + file_schema.file_type = meta::GroupFileSchema::TO_DELETE; + updated.push_back(file_schema); + } + + faiss::write_index(&index, group_file.location.c_str()); + group_file.file_type = meta::GroupFileSchema::RAW; + updated.push_back(group_file); + status = _pMeta->update_files(updated); + + return status; +} + +Status DBImpl::background_merge_files(const std::string& group_id) { + meta::DatePartionedGroupFilesSchema raw_files; + /* auto status = _pMeta->get_small_raw_files(group_id, raw_files); */ + /* if (!status.ok()) { */ + /* _bg_error = status; */ + /* return status; */ + /* } */ + + if (raw_files.size() == 0) { + return Status::OK(); + } + + for (auto& kv : raw_files) { + merge_files(group_id, kv.first, kv.second); + } +} + void DBImpl::background_compaction() { std::vector group_ids; _pMemMgr->serialize(group_ids); for (auto group_id : group_ids) { std::cout << __func__ << " group_id=" << group_id << std::endl; } + + if (group_ids.size() > 0) { + + } } DBImpl::~DBImpl() { diff --git a/cpp/src/db/db_impl.h b/cpp/src/db/db_impl.h index b8e45f8006..18c6b154d1 100644 --- a/cpp/src/db/db_impl.h +++ b/cpp/src/db/db_impl.h @@ -14,19 +14,23 @@ namespace engine { class Env; +namespace meta { + class Meta; +} + class DBImpl : public DB { public: DBImpl(const Options& options_, const std::string& name_); virtual Status add_group(const GroupOptions& options_, const std::string& group_id_, - GroupSchema& group_info_) override; - virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override; + meta::GroupSchema& group_info_) override; + virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, - GroupFilesSchema& group_files_info_) override; + meta::GroupFilesSchema& group_files_info_) override; virtual Status add_vectors(const std::string& group_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) override; @@ -37,6 +41,10 @@ public: virtual ~DBImpl(); private: + Status merge_files(const std::string& group_id, + const meta::DateT& date, + const meta::GroupFilesSchema& files); + Status background_merge_files(const std::string& group_id); void try_schedule_compaction(); void start_timer_task(int interval_); @@ -56,7 +64,7 @@ private: Status _bg_error; std::atomic _shutting_down; - std::shared_ptr _pMeta; + std::shared_ptr _pMeta; std::shared_ptr _pMemMgr; }; // DBImpl diff --git a/cpp/src/db/db_meta.cpp b/cpp/src/db/db_meta.cpp index 9274b8258e..2ac0995962 100644 --- a/cpp/src/db/db_meta.cpp +++ b/cpp/src/db/db_meta.cpp @@ -1,10 +1,21 @@ +#include #include "db_meta.h" namespace zilliz { namespace vecwise { namespace engine { +namespace meta { +DateT Meta::GetDate(const std::time_t& t) { + tm *ltm = std::localtime(&t); + return ltm->tm_year*10000 + ltm->tm_mon*100 + ltm->tm_mday; +} +DateT Meta::GetDate() { + return GetDate(std::time(nullptr)); +} + +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/db_meta.h b/cpp/src/db/db_meta.h index 1eae4b95a6..22ab911fd3 100644 --- a/cpp/src/db/db_meta.h +++ b/cpp/src/db/db_meta.h @@ -3,12 +3,17 @@ #include #include #include +#include +#include #include "options.h" #include "status.h" namespace zilliz { namespace vecwise { namespace engine { +namespace meta { + +typedef int DateT; struct GroupSchema { size_t id; @@ -22,19 +27,24 @@ struct GroupSchema { struct GroupFileSchema { typedef enum { + NEW, RAW, - INDEX + INDEX, + TO_DELETE, } FILE_TYPE; size_t id; std::string group_id; std::string file_id; - int files_type = RAW; + int file_type = NEW; size_t rows; + DateT date; + uint16_t dimension; std::string location = ""; }; // GroupFileSchema typedef std::vector GroupFilesSchema; +typedef std::map DatePartionedGroupFilesSchema; class Meta { @@ -47,6 +57,9 @@ public: virtual Status add_group_file(const std::string& group_id_, GroupFileSchema& group_file_info_) = 0; + virtual Status add_group_file(const std::string& group_id, + DateT date, + GroupFileSchema& group_file_info) = 0; virtual Status has_group_file(const std::string& group_id_, const std::string& file_id_, bool& has_or_not_) = 0; @@ -59,8 +72,14 @@ public: const int date_delta_, GroupFilesSchema& group_files_info_) = 0; + virtual Status update_files(const GroupFilesSchema& files) = 0; + + static DateT GetDate(const std::time_t& t); + static DateT GetDate(); + }; // MetaData +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/db_meta_impl.cpp b/cpp/src/db/db_meta_impl.cpp index 5ae038e77a..1f8c4a94ce 100644 --- a/cpp/src/db/db_meta_impl.cpp +++ b/cpp/src/db/db_meta_impl.cpp @@ -6,6 +6,7 @@ namespace zilliz { namespace vecwise { namespace engine { +namespace meta { DBMetaImpl::DBMetaImpl(const MetaOptions& options_) : _options(static_cast(options_)) { @@ -26,13 +27,6 @@ Status DBMetaImpl::add_group(const GroupOptions& options_, Status DBMetaImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) { //PXU TODO - std::stringstream ss; - SimpleIDGenerator g; - ss.str(""); - ss << "/tmp/test/" << g.getNextIDNumber() << ".log"; - group_info_.group_id = "1"; - group_info_.dimension = 64; - group_info_.next_file_location = ss.str(); return Status::OK(); } @@ -41,9 +35,24 @@ Status DBMetaImpl::has_group(const std::string& group_id_, bool& has_or_not_) { return Status::OK(); } -Status DBMetaImpl::add_group_file(const std::string& group_id_, - GroupFileSchema& group_file_info_) { +Status DBMetaImpl::add_group_file(const std::string& group_id, + GroupFileSchema& group_file_info) { + return add_group_file(group_id, Meta::GetDate(), group_file_info); +} + +Status DBMetaImpl::add_group_file(const std::string& group_id, + DateT date, + GroupFileSchema& group_file_info) { //PXU TODO + std::stringstream ss; + SimpleIDGenerator g; + ss << "/tmp/test/" << date + << "/" << g.getNextIDNumber() + << ".log"; + group_file_info.group_id = "1"; + group_file_info.dimension = 64; + group_file_info.location = ss.str(); + group_file_info.date = date; return Status::OK(); } @@ -73,6 +82,12 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file_) { return Status::OK(); } +Status DBMetaImpl::update_files(const GroupFilesSchema& files) { + //PXU TODO + return Status::OK(); +} + +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/db_meta_impl.h b/cpp/src/db/db_meta_impl.h index f8602c1635..06e04902ce 100644 --- a/cpp/src/db/db_meta_impl.h +++ b/cpp/src/db/db_meta_impl.h @@ -7,6 +7,7 @@ namespace zilliz { namespace vecwise { namespace engine { +namespace meta { class DBMetaImpl : public Meta { public: @@ -18,6 +19,9 @@ public: virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; + virtual Status add_group_file(const std::string& group_id, + DateT date, + GroupFileSchema& group_file_info) override; virtual Status add_group_file(const std::string& group_id_, GroupFileSchema& group_file_info_) override; virtual Status has_group_file(const std::string& group_id_, @@ -32,6 +36,8 @@ public: const int date_delta_, GroupFilesSchema& group_files_info_) override; + virtual Status update_files(const GroupFilesSchema& files) override; + private: Status initialize(); @@ -40,6 +46,7 @@ private: }; // DBMetaImpl +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/memvectors.cpp b/cpp/src/db/memvectors.cpp index dac9e8b478..384e69be3f 100644 --- a/cpp/src/db/memvectors.cpp +++ b/cpp/src/db/memvectors.cpp @@ -75,15 +75,15 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) { return memIt->second; } - GroupSchema group_info; - Status status = _pMeta->get_group(group_id, group_info); + meta::GroupFileSchema group_file; + auto status = _pMeta->add_group_file(group_id, group_file); if (!status.ok()) { return nullptr; } - _memMap[group_id] = std::shared_ptr(new MemVectors(group_info.group_id, - group_info.dimension, - group_info.next_file_location)); + _memMap[group_id] = std::shared_ptr(new MemVectors(group_file.group_id, + group_file.dimension, + group_file.location)); return _memMap[group_id]; } diff --git a/cpp/src/db/memvectors.h b/cpp/src/db/memvectors.h index 6b70df267c..5a6df87eae 100644 --- a/cpp/src/db/memvectors.h +++ b/cpp/src/db/memvectors.h @@ -19,6 +19,10 @@ namespace zilliz { namespace vecwise { namespace engine { +namespace meta { + class Meta; +} + class MemVectors { public: explicit MemVectors(const std::string& group_id, @@ -52,12 +56,11 @@ private: }; // MemVectors -class Meta; typedef std::shared_ptr VectorsPtr; class MemManager { public: - MemManager(const std::shared_ptr& meta_) + MemManager(const std::shared_ptr& meta_) : _pMeta(meta_) /*_last_compact_time(std::time(nullptr))*/ {} VectorsPtr get_mem_by_group(const std::string& group_id_); @@ -76,7 +79,7 @@ private: typedef std::vector ImmMemPool; MemMap _memMap; ImmMemPool _immMems; - std::shared_ptr _pMeta; + std::shared_ptr _pMeta; /* std::time_t _last_compact_time; */ std::mutex _mutex; }; // MemManager