merge branch from Xu and add metrics

Former-commit-id: a5d3e97dbd87c6478ece637046a2d9545fbc973f
pull/191/head
yu yunfeng 2019-05-28 18:46:07 +08:00
commit 4116f710f5
35 changed files with 987 additions and 1613 deletions

View File

@ -17,4 +17,4 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-1 - Add CHANGELOG.md
- MS-4 - Refactor the vecwise_engine code structure
- MS-6 - Implement SDK interface part 1
- MS-20 - Clean Code Part 1

View File

@ -7,8 +7,6 @@
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
/* #include "FaissExecutionEngine.h" */
/* #include "Traits.h" */
#include "Factories.h"
namespace zilliz {

View File

@ -5,12 +5,13 @@
******************************************************************************/
#pragma once
#include <string>
#include "Options.h"
#include "Meta.h"
#include "Status.h"
#include "Types.h"
#include <string>
namespace zilliz {
namespace vecwise {
namespace engine {
@ -21,29 +22,22 @@ class DB {
public:
static void Open(const Options& options, DB** dbptr);
virtual Status add_group(meta::GroupSchema& group_info_) = 0;
virtual Status get_group(meta::GroupSchema& group_info_) = 0;
virtual Status delete_vectors(const std::string& group_id,
const meta::DatesT& dates) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
meta::GroupFilesSchema& group_files_info_) = 0;
virtual Status CreateTable(meta::TableSchema& table_schema_) = 0;
virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0;
virtual Status HasTable(const std::string& table_id_, bool& has_or_not_) = 0;
virtual Status add_vectors(const std::string& group_id_,
virtual Status InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, QueryResults& results) = 0;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0;
virtual Status size(long& result) = 0;
virtual Status Size(long& result) = 0;
virtual Status drop_all() = 0;
virtual Status count(const std::string& group_id, long& result) = 0;
virtual Status DropAll() = 0;
DB() = default;
DB(const DB&) = delete;

View File

@ -5,15 +5,15 @@
******************************************************************************/
#pragma once
#include "DB.h"
#include "MemManager.h"
#include "Types.h"
#include "Traits.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
#include "DB.h"
#include "MemManager.h"
#include "Types.h"
#include "FaissExecutionEngine.h"
#include "Traits.h"
namespace zilliz {
namespace vecwise {
@ -28,63 +28,56 @@ namespace meta {
template <typename EngineT>
class DBImpl : public DB {
public:
typedef typename meta::Meta::Ptr MetaPtr;
typedef typename MemManager<EngineT>::Ptr MemManagerPtr;
using MetaPtr = meta::Meta::Ptr;
using MemManagerPtr = typename MemManager<EngineT>::Ptr;
DBImpl(const Options& options);
virtual Status add_group(meta::GroupSchema& group_info) override;
virtual Status get_group(meta::GroupSchema& group_info) override;
virtual Status delete_vectors(const std::string& group_id, const meta::DatesT& dates) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status CreateTable(meta::TableSchema& table_schema) override;
virtual Status DescribeTable(meta::TableSchema& table_schema) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
meta::GroupFilesSchema& group_files_info_) override;
virtual Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids) override;
virtual Status add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) override;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, QueryResults& results) override;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) override;
virtual Status drop_all() override;
virtual Status DropAll() override;
virtual Status count(const std::string& group_id, long& result) override;
virtual Status size(long& result) override;
virtual Status Size(long& result) override;
virtual ~DBImpl();
private:
void background_build_index();
Status build_index(const meta::GroupFileSchema&);
Status try_build_index();
Status merge_files(const std::string& group_id,
void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema&);
Status TryBuildIndex();
Status MergeFiles(const std::string& table_id,
const meta::DateT& date,
const meta::GroupFilesSchema& files);
Status background_merge_files(const std::string& group_id);
const meta::TableFilesSchema& files);
Status BackgroundMergeFiles(const std::string& table_id);
void try_schedule_compaction();
void start_timer_task(int interval_);
void background_timer_task(int interval_);
void TrySchedule();
void StartTimerTasks(int interval);
void BackgroundTimerTask(int interval);
static void BGWork(void* db);
void background_call();
void background_compaction();
void BackgroundCall();
void BackgroundCompaction();
Env* const _env;
const Options _options;
Env* const env_;
const Options options_;
std::mutex _mutex;
std::condition_variable _bg_work_finish_signal;
bool _bg_compaction_scheduled;
Status _bg_error;
std::atomic<bool> _shutting_down;
std::mutex mutex_;
std::condition_variable bg_work_finish_signal_;
bool bg_compaction_scheduled_;
Status bg_error_;
std::atomic<bool> shutting_down_;
std::mutex build_index_mutex_;
bool bg_build_index_started_;
@ -92,8 +85,8 @@ private:
std::thread bg_timer_thread_;
MetaPtr _pMeta;
MemManagerPtr _pMemMgr;
MetaPtr pMeta_;
MemManagerPtr pMemMgr_;
}; // DBImpl
@ -102,4 +95,4 @@ private:
} // namespace vecwise
} // namespace zilliz
#include "DBImpl.cpp"
#include "DBImpl.inl"

View File

@ -3,8 +3,11 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#ifndef DBIMPL_CPP__
#define DBIMPL_CPP__
#pragma once
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include <assert.h>
#include <chrono>
@ -14,10 +17,6 @@
#include <easylogging++.h>
#include <cache/CpuCacheMgr.h>
#include "../utils/Log.h"
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "metrics/Metrics.h"
namespace zilliz {
@ -27,83 +26,37 @@ namespace engine {
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
: _env(options.env),
_options(options),
_bg_compaction_scheduled(false),
_shutting_down(false),
: env_(options.env),
options_(options),
bg_compaction_scheduled_(false),
shutting_down_(false),
bg_build_index_started_(false),
_pMeta(new meta::DBMetaImpl(_options.meta)),
_pMemMgr(new MemManager<EngineT>(_pMeta, _options)) {
start_timer_task(_options.memory_sync_interval);
pMeta_(new meta::DBMetaImpl(options_.meta)),
pMemMgr_(new MemManager<EngineT>(pMeta_, options_)) {
StartTimerTasks(options_.memory_sync_interval);
}
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::GroupSchema& group_info) {
Status result = _pMeta->add_group(group_info);
if(result.ok()){
// SERVER_LOG_INFO << "add_group request successed";
// server::Metrics::GetInstance().add_group_success_total().Increment();
} else{
// SERVER_LOG_INFO << "add_group request failed";
// server::Metrics::GetInstance().add_group_fail_total().Increment();
}
return result;
Status DBImpl<EngineT>::CreateTable(meta::TableSchema& table_schema) {
return pMeta_->CreateTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
Status result = _pMeta->get_group(group_info);
if(result.ok()){
// SERVER_LOG_INFO << "get_group request successed";
// server::Metrics::GetInstance().get_group_success_total().Increment();
} else{
// SERVER_LOG_INFO << "get_group request failed";
// server::Metrics::GetInstance().get_group_fail_total().Increment();
}
return result;
Status DBImpl<EngineT>::DescribeTable(meta::TableSchema& table_schema) {
return pMeta_->DescribeTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::delete_vectors(const std::string& group_id,
const meta::DatesT& dates) {
return _pMeta->delete_group_partitions(group_id, dates);
Status DBImpl<EngineT>::HasTable(const std::string& table_id, bool& has_or_not) {
return pMeta_->HasTable(table_id, has_or_not);
}
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
Status result = _pMeta->has_group(group_id_, has_or_not_);
if(result.ok()){
// SERVER_LOG_INFO << "has_group request successed";
// server::Metrics::GetInstance().has_group_success_total().Increment();
} else{
// SERVER_LOG_INFO << "has_group request failed";
// server::Metrics::GetInstance().has_group_fail_total().Increment();
}
return result;
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group_files(const std::string& group_id,
const int date_delta,
meta::GroupFilesSchema& group_files_info) {
Status result = _pMeta->get_group_files(group_id, date_delta, group_files_info);
if(result.ok()){
// SERVER_LOG_INFO << "get_group_files request successed";
// server::Metrics::GetInstance().get_group_files_success_total().Increment();
} else{
// SERVER_LOG_INFO << "get_group_files request failed";
// server::Metrics::GetInstance().get_group_files_fail_total().Increment();
}
return result;
}
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
Status DBImpl<EngineT>::InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
auto start_time = METRICS_NOW_TIME;
Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
@ -116,7 +69,6 @@ Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (!status.ok()) {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
return status;
@ -125,29 +77,28 @@ Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
}
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string &group_id, size_t k, size_t nq,
Status DBImpl<EngineT>::Query(const std::string &table_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
meta::DatesT dates = {meta::Meta::GetDate()};
return search(group_id, k, nq, vectors, dates, results);
return Query(table_id, k, nq, vectors, dates, results);
}
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedGroupFilesSchema files;
auto status = _pMeta->files_to_search(group_id, dates, files);
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
if (!status.ok()) { return status; }
LOG(DEBUG) << "Search DateT Size=" << files.size();
meta::GroupFilesSchema index_files;
meta::GroupFilesSchema raw_files;
meta::TableFilesSchema index_files;
meta::TableFilesSchema raw_files;
for (auto &day_files : files) {
for (auto &file : day_files.second) {
file.file_type == meta::GroupFileSchema::INDEX ?
file.file_type == meta::TableFileSchema::INDEX ?
index_files.push_back(file) : raw_files.push_back(file);
}
}
@ -186,7 +137,7 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
long search_set_size = 0;
auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void {
auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
for (auto &file : file_vec) {
EngineT index(file.dimension, file.location);
index.Load();
@ -201,13 +152,14 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
index.Search(nq, vectors, inner_k, output_distence, output_ids);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
if(file.file_type == meta::GroupFileSchema::RAW) {
if(file.file_type == meta::TableFileSchema::RAW) {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024);
} else if(file.file_type == meta::GroupFileSchema::TO_INDEX) {
} else if(file.file_type == meta::TableFileSchema::TO_INDEX) {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
@ -280,74 +232,74 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
}
if (results.empty()) {
return Status::NotFound("Group " + group_id + ", search result not found!");
return Status::NotFound("Group " + table_id + ", search result not found!");
}
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::start_timer_task(int interval_) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::background_timer_task, this, interval_);
void DBImpl<EngineT>::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::BackgroundTimerTask, this, interval);
}
template<typename EngineT>
void DBImpl<EngineT>::background_timer_task(int interval_) {
void DBImpl<EngineT>::BackgroundTimerTask(int interval) {
Status status;
while (true) {
if (!_bg_error.ok()) break;
if (_shutting_down.load(std::memory_order_acquire)) break;
if (!bg_error_.ok()) break;
if (shutting_down_.load(std::memory_order_acquire)) break;
std::this_thread::sleep_for(std::chrono::seconds(interval_));
std::this_thread::sleep_for(std::chrono::seconds(interval));
try_schedule_compaction();
TrySchedule();
}
}
template<typename EngineT>
void DBImpl<EngineT>::try_schedule_compaction() {
if (_bg_compaction_scheduled) return;
if (!_bg_error.ok()) return;
void DBImpl<EngineT>::TrySchedule() {
if (bg_compaction_scheduled_) return;
if (!bg_error_.ok()) return;
_bg_compaction_scheduled = true;
_env->schedule(&DBImpl<EngineT>::BGWork, this);
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl<EngineT>::BGWork, this);
}
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
reinterpret_cast<DBImpl*>(db_)->background_call();
reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
}
template<typename EngineT>
void DBImpl<EngineT>::background_call() {
std::lock_guard<std::mutex> lock(_mutex);
assert(_bg_compaction_scheduled);
void DBImpl<EngineT>::BackgroundCall() {
std::lock_guard<std::mutex> lock(mutex_);
assert(bg_compaction_scheduled_);
if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire))
if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
return ;
background_compaction();
BackgroundCompaction();
_bg_compaction_scheduled = false;
_bg_work_finish_signal.notify_all();
bg_compaction_scheduled_ = false;
bg_work_finish_signal_.notify_all();
}
template<typename EngineT>
Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::DateT& date,
const meta::GroupFilesSchema& files) {
meta::GroupFileSchema group_file;
group_file.group_id = group_id;
group_file.date = date;
Status status = _pMeta->add_group_file(group_file);
Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::DateT& date,
const meta::TableFilesSchema& files) {
meta::TableFileSchema table_file;
table_file.table_id = table_id;
table_file.date = date;
Status status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
LOG(INFO) << status.ToString() << std::endl;
return status;
}
EngineT index(group_file.dimension, group_file.location);
EngineT index(table_file.dimension, table_file.location);
meta::GroupFilesSchema updated;
meta::TableFilesSchema updated;
long index_size = 0;
for (auto& file : files) {
@ -359,26 +311,26 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
file_schema.file_type = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
LOG(DEBUG) << "Merging file " << file_schema.file_id;
index_size = index.Size();
if (index_size >= _options.index_trigger_size) break;
if (index_size >= options_.index_trigger_size) break;
}
index.Serialize();
if (index_size >= _options.index_trigger_size) {
group_file.file_type = meta::GroupFileSchema::TO_INDEX;
if (index_size >= options_.index_trigger_size) {
table_file.file_type = meta::TableFileSchema::TO_INDEX;
} else {
group_file.file_type = meta::GroupFileSchema::RAW;
table_file.file_type = meta::TableFileSchema::RAW;
}
group_file.size = index_size;
updated.push_back(group_file);
status = _pMeta->update_files(updated);
LOG(DEBUG) << "New merged file " << group_file.file_id <<
table_file.size = index_size;
updated.push_back(table_file);
status = pMeta_->UpdateTableFiles(updated);
LOG(DEBUG) << "New merged file " << table_file.file_id <<
" of size=" << index.PhysicalSize()/(1024*1024) << " M";
index.Cache();
@ -387,43 +339,39 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
}
template<typename EngineT>
Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
meta::DatePartionedGroupFilesSchema raw_files;
auto status = _pMeta->files_to_merge(group_id, raw_files);
Status DBImpl<EngineT>::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = pMeta_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
return status;
}
/* if (raw_files.size() == 0) { */
/* return Status::OK(); */
/* } */
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() <= _options.merge_trigger_number) {
if (files.size() <= options_.merge_trigger_number) {
continue;
}
has_merge = true;
merge_files(group_id, kv.first, kv.second);
MergeFiles(table_id, kv.first, kv.second);
}
_pMeta->archive_files();
pMeta_->Archive();
try_build_index();
TryBuildIndex();
_pMeta->cleanup_ttl_files(1);
pMeta_->CleanUpFilesWithTTL(1);
return Status::OK();
}
template<typename EngineT>
Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
meta::GroupFileSchema group_file;
group_file.group_id = file.group_id;
group_file.date = file.date;
Status status = _pMeta->add_group_file(group_file);
Status DBImpl<EngineT>::BuildIndex(const meta::TableFileSchema& file) {
meta::TableFileSchema table_file;
table_file.table_id = file.table_id;
table_file.date = file.date;
Status status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
return status;
}
@ -432,42 +380,42 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
to_index.Load();
auto start_time = METRICS_NOW_TIME;
auto index = to_index.BuildIndex(group_file.location);
auto index = to_index.BuildIndex(table_file.location);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
group_file.file_type = meta::GroupFileSchema::INDEX;
group_file.size = index->Size();
table_file.file_type = meta::TableFileSchema::INDEX;
table_file.size = index->Size();
auto to_remove = file;
to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
to_remove.file_type = meta::TableFileSchema::TO_DELETE;
meta::GroupFilesSchema update_files = {to_remove, group_file};
_pMeta->update_files(update_files);
meta::TableFilesSchema update_files = {to_remove, table_file};
pMeta_->UpdateTableFiles(update_files);
LOG(DEBUG) << "New index file " << group_file.file_id << " of size "
LOG(DEBUG) << "New index file " << table_file.file_id << " of size "
<< index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id;
index->Cache();
_pMeta->archive_files();
pMeta_->Archive();
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::background_build_index() {
void DBImpl<EngineT>::BackgroundBuildIndex() {
std::lock_guard<std::mutex> lock(build_index_mutex_);
assert(bg_build_index_started_);
meta::GroupFilesSchema to_index_files;
_pMeta->files_to_index(to_index_files);
meta::TableFilesSchema to_index_files;
pMeta_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
/* LOG(DEBUG) << "Buiding index for " << file.location; */
status = build_index(file);
status = BuildIndex(file);
if (!status.ok()) {
_bg_error = status;
bg_error_ = status;
return;
}
}
@ -478,52 +426,47 @@ void DBImpl<EngineT>::background_build_index() {
}
template<typename EngineT>
Status DBImpl<EngineT>::try_build_index() {
Status DBImpl<EngineT>::TryBuildIndex() {
if (bg_build_index_started_) return Status::OK();
if (_shutting_down.load(std::memory_order_acquire)) return Status::OK();
if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
bg_build_index_started_ = true;
std::thread build_index_task(&DBImpl<EngineT>::background_build_index, this);
std::thread build_index_task(&DBImpl<EngineT>::BackgroundBuildIndex, this);
build_index_task.detach();
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::background_compaction() {
std::vector<std::string> group_ids;
_pMemMgr->serialize(group_ids);
void DBImpl<EngineT>::BackgroundCompaction() {
std::vector<std::string> table_ids;
pMemMgr_->Serialize(table_ids);
Status status;
for (auto group_id : group_ids) {
status = background_merge_files(group_id);
for (auto table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
_bg_error = status;
bg_error_ = status;
return;
}
}
}
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
return _pMeta->drop_all();
Status DBImpl<EngineT>::DropAll() {
return pMeta_->DropAll();
}
template<typename EngineT>
Status DBImpl<EngineT>::count(const std::string& group_id, long& result) {
return _pMeta->count(group_id, result);
}
template<typename EngineT>
Status DBImpl<EngineT>::size(long& result) {
return _pMeta->size(result);
Status DBImpl<EngineT>::Size(long& result) {
return pMeta_->Size(result);
}
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
{
std::unique_lock<std::mutex> lock(_mutex);
_shutting_down.store(true, std::memory_order_release);
while (_bg_compaction_scheduled) {
_bg_work_finish_signal.wait(lock);
std::unique_lock<std::mutex> lock(mutex_);
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_) {
bg_work_finish_signal_.wait(lock);
}
}
{
@ -534,12 +477,10 @@ DBImpl<EngineT>::~DBImpl() {
}
bg_timer_thread_.join();
std::vector<std::string> ids;
_pMemMgr->serialize(ids);
_env->Stop();
pMemMgr_->Serialize(ids);
env_->Stop();
}
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif

File diff suppressed because it is too large Load Diff

View File

@ -19,62 +19,53 @@ class DBMetaImpl : public Meta {
public:
DBMetaImpl(const DBMetaOptions& options_);
virtual Status add_group(GroupSchema& group_info) override;
virtual Status get_group(GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status CreateTable(TableSchema& table_schema) override;
virtual Status DescribeTable(TableSchema& group_info_) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
virtual Status delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) override;
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) override;
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) override;
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) override;
virtual Status update_group_file(GroupFileSchema& group_file_) override;
virtual Status GetTableFile(TableFileSchema& file_schema) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
virtual Status update_files(GroupFilesSchema& files) override;
virtual Status UpdateTableFiles(TableFilesSchema& files) override;
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) override;
virtual Status files_to_search(const std::string& group_id,
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedGroupFilesSchema& files) override;
DatePartionedTableFilesSchema& files) override;
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) override;
virtual Status archive_files() override;
virtual Status FilesToIndex(TableFilesSchema&) override;
virtual Status size(long& result) override;
virtual Status Archive() override;
virtual Status cleanup() override;
virtual Status Size(long& result) override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
virtual Status CleanUp() override;
virtual Status drop_all() override;
virtual Status CleanUpFilesWithTTL(uint16_t seconds) override;
virtual Status count(const std::string& group_id, long& result) override;
virtual Status DropAll() override;
virtual Status Count(const std::string& table_id, long& result) override;
virtual ~DBMetaImpl();
private:
Status NextFileId(std::string& file_id);
Status NextGroupId(std::string& group_id);
Status discard_files_of_size(long to_discard_size);
Status get_group_no_lock(GroupSchema& group_info);
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
void GetGroupFilePath(GroupFileSchema& group_file);
Status initialize();
Status NextTableId(std::string& table_id);
Status DiscardFiles(long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize();
const DBMetaOptions _options;
const DBMetaOptions options_;
}; // DBMetaImpl
} // namespace meta

View File

@ -13,66 +13,66 @@ namespace vecwise {
namespace engine {
Env::Env()
: _bg_work_started(false),
_shutting_down(false) {
: bg_work_started_(false),
shutting_down_(false) {
}
void Env::schedule(void (*function_)(void* arg_), void* arg_) {
std::unique_lock<std::mutex> lock(_bg_work_mutex);
if (_shutting_down) return;
void Env::Schedule(void (*function)(void* arg), void* arg) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_) return;
if (!_bg_work_started) {
_bg_work_started = true;
if (!bg_work_started_) {
bg_work_started_ = true;
std::thread bg_thread(Env::BackgroundThreadEntryPoint, this);
bg_thread.detach();
}
if (_bg_work_queue.empty()) {
_bg_work_cv.notify_one();
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
_bg_work_queue.emplace(function_, arg_);
bg_work_queue_.emplace(function, arg);
}
void Env::backgroud_thread_main() {
while (!_shutting_down) {
std::unique_lock<std::mutex> lock(_bg_work_mutex);
while (_bg_work_queue.empty() && !_shutting_down) {
_bg_work_cv.wait(lock);
void Env::BackgroundThreadMain() {
while (!shutting_down_) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
while (bg_work_queue_.empty() && !shutting_down_) {
bg_work_cv_.wait(lock);
}
if (_shutting_down) break;
if (shutting_down_) break;
assert(!_bg_work_queue.empty());
auto bg_function = _bg_work_queue.front()._function;
void* bg_arg = _bg_work_queue.front()._arg;
_bg_work_queue.pop();
assert(!bg_work_queue_.empty());
auto bg_function = bg_work_queue_.front().function_;
void* bg_arg = bg_work_queue_.front().arg_;
bg_work_queue_.pop();
lock.unlock();
bg_function(bg_arg);
}
std::unique_lock<std::mutex> lock(_bg_work_mutex);
_bg_work_started = false;
_bg_work_cv.notify_all();
std::unique_lock<std::mutex> lock(bg_work_mutex_);
bg_work_started_ = false;
bg_work_cv_.notify_all();
}
void Env::Stop() {
{
std::unique_lock<std::mutex> lock(_bg_work_mutex);
if (_shutting_down || !_bg_work_started) return;
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_ || !bg_work_started_) return;
}
_shutting_down = true;
shutting_down_ = true;
{
std::unique_lock<std::mutex> lock(_bg_work_mutex);
if (_bg_work_queue.empty()) {
_bg_work_cv.notify_one();
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
while (_bg_work_started) {
_bg_work_cv.wait(lock);
while (bg_work_started_) {
bg_work_cv_.wait(lock);
}
}
_shutting_down = false;
shutting_down_ = false;
}
Env::~Env() {}

View File

@ -22,7 +22,7 @@ public:
Env(const Env&) = delete;
Env& operator=(const Env&) = delete;
void schedule(void (*function_)(void* arg_), void* arg_);
void Schedule(void (*function)(void* arg), void* arg);
virtual void Stop();
@ -31,25 +31,24 @@ public:
static Env* Default();
protected:
void backgroud_thread_main();
void BackgroundThreadMain();
static void BackgroundThreadEntryPoint(Env* env) {
env->backgroud_thread_main();
env->BackgroundThreadMain();
}
struct BGWork {
explicit BGWork(void (*function_)(void*), void* arg_)
: _function(function_), _arg(arg_) {}
explicit BGWork(void (*function)(void*), void* arg)
: function_(function), arg_(arg) {}
void (* const _function)(void*);
void* const _arg;
void (* const function_)(void*);
void* const arg_;
};
std::mutex _bg_work_mutex;
std::condition_variable _bg_work_cv;
std::queue<BGWork> _bg_work_queue;
bool _bg_work_started;
std::atomic<bool> _shutting_down;
std::mutex bg_work_mutex_;
std::condition_variable bg_work_cv_;
std::queue<BGWork> bg_work_queue_;
bool bg_work_started_;
std::atomic<bool> shutting_down_;
}; // Env
} // namespace engine

View File

@ -3,9 +3,10 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <easylogging++.h>
#include "ExecutionEngine.h"
#include <easylogging++.h>
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -5,11 +5,11 @@
******************************************************************************/
#pragma once
#include "Status.h"
#include <vector>
#include <memory>
#include "Status.h"
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -3,6 +3,11 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "Factories.h"
#include "DBImpl.h"
#include "FaissExecutionEngine.h"
#include "Traits.h"
#include <stdlib.h>
#include <time.h>
#include <sstream>
@ -11,12 +16,6 @@
#include <assert.h>
#include <easylogging++.h>
#include "Factories.h"
#include "DBImpl.h"
#include "FaissExecutionEngine.h"
#include "Traits.h"
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -3,15 +3,15 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <string>
#include <memory>
#include "DB.h"
#include "DBMetaImpl.h"
#include "Options.h"
#include <string>
#include <memory>
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -5,11 +5,11 @@
******************************************************************************/
#pragma once
#include "ExecutionEngine.h"
#include <memory>
#include <string>
#include "ExecutionEngine.h"
namespace faiss {
class Index;
}
@ -22,7 +22,7 @@ namespace engine {
template<class IndexTrait>
class FaissExecutionEngine : public ExecutionEngine<FaissExecutionEngine<IndexTrait>> {
public:
typedef std::shared_ptr<FaissExecutionEngine<IndexTrait>> Ptr;
using Ptr = std::shared_ptr<FaissExecutionEngine<IndexTrait>>;
FaissExecutionEngine(uint16_t dimension, const std::string& location);
FaissExecutionEngine(std::shared_ptr<faiss::Index> index, const std::string& location);
@ -53,7 +53,9 @@ public:
Ptr BuildIndex(const std::string&);
Status Cache();
protected:
std::shared_ptr<faiss::Index> pIndex_;
std::string location_;
};
@ -63,4 +65,4 @@ protected:
} // namespace vecwise
} // namespace zilliz
#include "FaissExecutionEngine.cpp"
#include "FaissExecutionEngine.inl"

View File

@ -3,8 +3,9 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#ifndef FAISSEXECUTIONENGINE_CPP__
#define FAISSEXECUTIONENGINE_CPP__
#pragma once
#include "FaissExecutionEngine.h"
#include <easylogging++.h>
#include <faiss/AutoTune.h>
@ -14,10 +15,9 @@
#include <wrapper/Index.h>
#include <wrapper/IndexBuilder.h>
#include <cache/CpuCacheMgr.h>
#include "FaissExecutionEngine.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
namespace engine {
@ -147,5 +147,3 @@ Status FaissExecutionEngine<IndexTrait>::Cache() {
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif

View File

@ -3,30 +3,29 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "IDGenerator.h"
#include <chrono>
#include <assert.h>
#include <iostream>
#include "IDGenerator.h"
namespace zilliz {
namespace vecwise {
namespace engine {
IDGenerator::~IDGenerator() {}
IDNumber SimpleIDGenerator::getNextIDNumber() {
IDNumber SimpleIDGenerator::GetNextIDNumber() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
return micros * MAX_IDS_PER_MICRO;
}
void SimpleIDGenerator::nextIDNumbers(size_t n, IDNumbers& ids) {
void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
if (n > MAX_IDS_PER_MICRO) {
nextIDNumbers(n-MAX_IDS_PER_MICRO, ids);
nextIDNumbers(MAX_IDS_PER_MICRO, ids);
NextIDNumbers(n-MAX_IDS_PER_MICRO, ids);
NextIDNumbers(MAX_IDS_PER_MICRO, ids);
return;
}
if (n <= 0) {
@ -41,12 +40,11 @@ void SimpleIDGenerator::nextIDNumbers(size_t n, IDNumbers& ids) {
for (int pos=0; pos<n; ++pos) {
ids.push_back(micros+pos);
}
}
void SimpleIDGenerator::getNextIDNumbers(size_t n, IDNumbers& ids) {
void SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) {
ids.clear();
nextIDNumbers(n, ids);
NextIDNumbers(n, ids);
}

View File

@ -5,17 +5,19 @@
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <vector>
#include "Types.h"
#include <cstddef>
#include <vector>
namespace zilliz {
namespace vecwise {
namespace engine {
class IDGenerator {
public:
virtual IDNumber getNextIDNumber() = 0;
virtual void getNextIDNumbers(size_t n, IDNumbers& ids) = 0;
virtual IDNumber GetNextIDNumber() = 0;
virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) = 0;
virtual ~IDGenerator();
@ -24,11 +26,11 @@ public:
class SimpleIDGenerator : public IDGenerator {
public:
virtual IDNumber getNextIDNumber() override;
virtual void getNextIDNumbers(size_t n, IDNumbers& ids) override;
virtual IDNumber GetNextIDNumber() override;
virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) override;
private:
void nextIDNumbers(size_t n, IDNumbers& ids);
void NextIDNumbers(size_t n, IDNumbers& ids);
const size_t MAX_IDS_PER_MICRO = 1000;
}; // SimpleIDGenerator

View File

@ -1,277 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <sys/stat.h>
#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <fstream>
#include "LocalMetaImpl.h"
#include "IDGenerator.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
long LocalMetaImpl::GetFileSize(const std::string& filename)
{
struct stat stat_buf;
int rc = stat(filename.c_str(), &stat_buf);
return rc == 0 ? stat_buf.st_size : -1;
}
std::string LocalMetaImpl::GetGroupPath(const std::string& group_id) {
return _options.path + "/" + group_id;
}
std::string LocalMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
std::stringstream ss;
ss << GetGroupPath(group_id) << "/" << date;
return ss.str();
}
std::string LocalMetaImpl::GetNextGroupFileLocationByPartition(const std::string& group_id, DateT& date,
GroupFileSchema::FILE_TYPE file_type) {
std::string suffix = (file_type == GroupFileSchema::RAW) ? ".raw" : ".index";
SimpleIDGenerator g;
std::stringstream ss;
ss << GetGroupPath(group_id) << "/" << date << "/" << g.getNextIDNumber() << suffix;
return ss.str();
}
std::string LocalMetaImpl::GetGroupMetaPathByGroupPath(const std::string& group_path) {
return group_path + "/" + "meta";
}
std::string LocalMetaImpl::GetGroupMetaPath(const std::string& group_id) {
return GetGroupMetaPathByGroupPath(GetGroupPath(group_id));
}
Status LocalMetaImpl::GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info) {
boost::property_tree::ptree ptree;
boost::property_tree::read_json(path, ptree);
auto files_cnt = ptree.get_child("files_cnt").data();
auto dimension = ptree.get_child("dimension").data();
/* std::cout << dimension << std::endl; */
/* std::cout << files_cnt << std::endl; */
group_info.id = std::stoi(group_info.group_id);
group_info.files_cnt = std::stoi(files_cnt);
group_info.dimension = std::stoi(dimension);
group_info.location = GetGroupPath(group_info.group_id);
return Status::OK();
}
Status LocalMetaImpl::GetGroupMetaInfo(const std::string& group_id, GroupSchema& group_info) {
group_info.group_id = group_id;
return GetGroupMetaInfoByPath(GetGroupMetaPath(group_id), group_info);
}
LocalMetaImpl::LocalMetaImpl(const DBMetaOptions& options_)
: _options(options_) {
initialize();
}
Status LocalMetaImpl::initialize() {
if (boost::filesystem::is_directory(_options.path)) {
}
else if (!boost::filesystem::create_directory(_options.path)) {
return Status::InvalidDBPath("Cannot Create " + _options.path);
}
return Status::OK();
}
Status LocalMetaImpl::add_group(GroupSchema& group_info) {
std::string real_gid;
size_t id = SimpleIDGenerator().getNextIDNumber();
if (group_info.group_id == "") {
std::stringstream ss;
ss << id;
real_gid = ss.str();
} else {
real_gid = group_info.group_id;
}
bool group_exist;
has_group(real_gid, group_exist);
if (group_exist) {
return Status::GroupError("Group Already Existed " + real_gid);
}
if (!boost::filesystem::create_directory(GetGroupPath(real_gid))) {
return Status::GroupError("Cannot Create Group " + real_gid);
}
group_info.group_id = real_gid;
group_info.files_cnt = 0;
group_info.id = 0;
group_info.location = GetGroupPath(real_gid);
boost::property_tree::ptree out;
out.put("files_cnt", group_info.files_cnt);
out.put("dimension", group_info.dimension);
boost::property_tree::write_json(GetGroupMetaPath(real_gid), out);
return Status::OK();
}
Status LocalMetaImpl::get_group(GroupSchema& group_info) {
bool group_exist;
has_group(group_info.group_id, group_exist);
if (!group_exist) {
return Status::NotFound("Group " + group_info.group_id + " Not Found");
}
return GetGroupMetaInfo(group_info.group_id, group_info);
}
Status LocalMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
has_or_not = boost::filesystem::is_directory(GetGroupPath(group_id));
return Status::OK();
}
Status LocalMetaImpl::add_group_file(GroupFileSchema& group_file_info) {
GroupSchema group_info;
/* auto status = get_group(group_info); */
/* if (!status.ok()) { */
/* return status; */
/* } */
/* auto location = GetNextGroupFileLocationByPartition(group_id, date, file_type); */
/* group_file_info.group_id = group_id; */
/* group_file_info.dimension = group_info.dimension; */
/* group_file_info.location = location; */
/* group_file_info.date = date; */
return Status::OK();
}
Status LocalMetaImpl::files_to_index(GroupFilesSchema& files) {
files.clear();
std::string suffix;
boost::filesystem::directory_iterator end_itr;
for (boost::filesystem::directory_iterator itr(_options.path); itr != end_itr; ++itr) {
auto group_path = itr->path().string();
GroupSchema group_info;
GetGroupMetaInfoByPath(GetGroupMetaPathByGroupPath(group_path), group_info);
for (boost::filesystem::directory_iterator innerItr(group_path); innerItr != end_itr; ++innerItr) {
auto partition_path = innerItr->path().string();
for (boost::filesystem::directory_iterator fItr(partition_path); fItr != end_itr; ++fItr) {
auto location = fItr->path().string();
suffix = location.substr(location.find_last_of('.') + 1);
if (suffix == "index") continue;
if (INDEX_TRIGGER_SIZE >= GetFileSize(location)) continue;
std::cout << "[About to index] " << location << std::endl;
GroupFileSchema f;
f.location = location;
/* f.group_id = group_id; */
f.dimension = group_info.dimension;
files.push_back(f);
}
}
}
return Status::OK();
}
Status LocalMetaImpl::files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) {
files.clear();
/* std::string suffix; */
/* boost::filesystem::directory_iterator end_itr; */
/* for (boost::filesystem::directory_iterator itr(_options.path); itr != end_itr; ++itr) { */
/* auto group_path = itr->path().string(); */
/* GroupSchema group_info; */
/* GetGroupMetaInfoByPath(GetGroupMetaPathByGroupPath(group_path), group_info); */
/* for (boost::filesystem::directory_iterator innerItr(group_path); innerItr != end_itr; ++innerItr) { */
/* auto partition_path = innerItr->path().string(); */
/* for (boost::filesystem::directory_iterator fItr(partition_path); fItr != end_itr; ++fItr) { */
/* auto location = fItr->path().string(); */
/* suffix = location.substr(location.find_last_of('.') + 1); */
/* if (suffix == "index") continue; */
/* if (INDEX_TRIGGER_SIZE < GetFileSize(location)) continue; */
/* std::cout << "[About to index] " << location << std::endl; */
/* GroupFileSchema f; */
/* f.location = location; */
/* f.group_id = group_id; */
/* f.dimension = group_info.dimension; */
/* files.push_back(f); */
/* } */
/* } */
/* } */
return Status::OK();
}
Status LocalMetaImpl::has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::update_group_file(GroupFileSchema& group_file_) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::update_files(GroupFilesSchema& files) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::archive_files() {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::cleanup() {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::cleanup_ttl_files(uint16_t seconds) {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::drop_all() {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::size(long& result) {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::count(const std::string& group_id, long& result) {
// PXU TODO
return Status::OK();
}
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz

View File

@ -1,83 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include "Meta.h"
#include "Options.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
class LocalMetaImpl : public Meta {
public:
const size_t INDEX_TRIGGER_SIZE = 1024*1024*500;
LocalMetaImpl(const DBMetaOptions& options_);
virtual Status add_group(GroupSchema& group_info_) override;
virtual Status get_group(GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
/* virtual Status delete_group_partitions(const std::string& group_id, */
/* const meta::DatesT& dates) override; */
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) override;
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) override;
virtual Status update_group_file(GroupFileSchema& group_file_) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
virtual Status update_files(GroupFilesSchema& files) override;
virtual Status cleanup() override;
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) override;
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status archive_files() override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
virtual Status count(const std::string& group_id, long& result) override;
virtual Status drop_all() override;
virtual Status size(long& result) override;
private:
Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info);
std::string GetGroupMetaPathByGroupPath(const std::string& group_path);
Status GetGroupMetaInfo(const std::string& group_id, GroupSchema& group_info);
std::string GetNextGroupFileLocationByPartition(const std::string& group_id, DateT& date,
GroupFileSchema::FILE_TYPE file_type);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupMetaPath(const std::string& group_id);
Status CreateGroupMeta(const GroupSchema& group_schema);
long GetFileSize(const std::string& filename);
Status initialize();
const DBMetaOptions _options;
}; // LocalMetaImpl
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz

View File

@ -5,15 +5,15 @@
******************************************************************************/
#pragma once
#include "IDGenerator.h"
#include "Status.h"
#include "Meta.h"
#include <map>
#include <string>
#include <ctime>
#include <memory>
#include <mutex>
#include "IDGenerator.h"
#include "Status.h"
#include "Meta.h"
namespace zilliz {
namespace vecwise {
@ -26,24 +26,24 @@ namespace meta {
template <typename EngineT>
class MemVectors {
public:
typedef typename EngineT::Ptr EnginePtr;
typedef typename meta::Meta::Ptr MetaPtr;
typedef std::shared_ptr<MemVectors<EngineT>> Ptr;
using EnginePtr = typename EngineT::Ptr;
using MetaPtr = meta::Meta::Ptr;
using Ptr = std::shared_ptr<MemVectors<EngineT>>;
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
const meta::GroupFileSchema&, const Options&);
const meta::TableFileSchema&, const Options&);
void add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
void Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
size_t total() const;
size_t Total() const;
size_t approximate_size() const;
size_t ApproximateSize() const;
Status serialize(std::string& group_id);
Status Serialize(std::string& table_id);
~MemVectors();
const std::string& location() const { return schema_.location; }
const std::string& Location() const { return schema_.location; }
private:
MemVectors() = delete;
@ -52,8 +52,8 @@ private:
MetaPtr pMeta_;
Options options_;
meta::GroupFileSchema schema_;
IDGenerator* _pIdGenerator;
meta::TableFileSchema schema_;
IDGenerator* pIdGenerator_;
EnginePtr pEE_;
}; // MemVectors
@ -63,32 +63,32 @@ private:
template<typename EngineT>
class MemManager {
public:
typedef typename meta::Meta::Ptr MetaPtr;
typedef typename MemVectors<EngineT>::Ptr MemVectorsPtr;
typedef std::shared_ptr<MemManager<EngineT>> Ptr;
using MetaPtr = meta::Meta::Ptr;
using MemVectorsPtr = typename MemVectors<EngineT>::Ptr;
using Ptr = std::shared_ptr<MemManager<EngineT>>;
MemManager(const std::shared_ptr<meta::Meta>& meta_, const Options& options)
: _pMeta(meta_), options_(options) {}
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
: pMeta_(meta), options_(options) {}
MemVectorsPtr get_mem_by_group(const std::string& group_id_);
MemVectorsPtr GetMemByTable(const std::string& table_id);
Status add_vectors(const std::string& group_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status serialize(std::vector<std::string>& group_ids);
Status Serialize(std::vector<std::string>& table_ids);
private:
Status add_vectors_no_lock(const std::string& group_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status mark_memory_as_immutable();
Status InsertVectorsNoLock(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status ToImmutable();
typedef std::map<std::string, MemVectorsPtr> MemMap;
typedef std::vector<MemVectorsPtr> ImmMemPool;
MemMap _memMap;
ImmMemPool _immMems;
MetaPtr _pMeta;
using MemMap = std::map<std::string, MemVectorsPtr>;
using ImmMemPool = std::vector<MemVectorsPtr>;
MemMap memMap_;
ImmMemPool immMems_;
MetaPtr pMeta_;
Options options_;
std::mutex _mutex;
std::mutex mutex_;
std::mutex serialization_mtx_;
}; // MemManager
@ -96,4 +96,4 @@ private:
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#include "MemManager.cpp"
#include "MemManager.inl"

View File

@ -3,18 +3,16 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#ifndef MEMMANGE_CPP__
#define MEMMANGE_CPP__
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
#pragma once
#include "MemManager.h"
#include "Meta.h"
#include "MetaConsts.h"
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
namespace zilliz {
namespace vecwise {
@ -22,42 +20,42 @@ namespace engine {
template<typename EngineT>
MemVectors<EngineT>::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
const meta::GroupFileSchema& schema, const Options& options)
const meta::TableFileSchema& schema, const Options& options)
: pMeta_(meta_ptr),
options_(options),
schema_(schema),
_pIdGenerator(new SimpleIDGenerator()),
pIdGenerator_(new SimpleIDGenerator()),
pEE_(new EngineT(schema_.dimension, schema_.location)) {
}
template<typename EngineT>
void MemVectors<EngineT>::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
_pIdGenerator->getNextIDNumbers(n_, vector_ids_);
void MemVectors<EngineT>::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
pIdGenerator_->GetNextIDNumbers(n_, vector_ids_);
pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
}
template<typename EngineT>
size_t MemVectors<EngineT>::total() const {
size_t MemVectors<EngineT>::Total() const {
return pEE_->Count();
}
template<typename EngineT>
size_t MemVectors<EngineT>::approximate_size() const {
size_t MemVectors<EngineT>::ApproximateSize() const {
return pEE_->Size();
}
template<typename EngineT>
Status MemVectors<EngineT>::serialize(std::string& group_id) {
group_id = schema_.group_id;
auto size = approximate_size();
Status MemVectors<EngineT>::Serialize(std::string& table_id) {
table_id = schema_.table_id;
auto size = ApproximateSize();
pEE_->Serialize();
schema_.size = size;
schema_.file_type = (size >= options_.index_trigger_size) ?
meta::GroupFileSchema::TO_INDEX : meta::GroupFileSchema::RAW;
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = pMeta_->update_group_file(schema_);
auto status = pMeta_->UpdateTableFile(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type == meta::GroupFileSchema::RAW) ? "raw" : "to_index")
LOG(DEBUG) << "New " << ((schema_.file_type == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id << " of size " << pEE_->Size() / meta::M << " M";
pEE_->Cache();
@ -67,9 +65,9 @@ Status MemVectors<EngineT>::serialize(std::string& group_id) {
template<typename EngineT>
MemVectors<EngineT>::~MemVectors() {
if (_pIdGenerator != nullptr) {
delete _pIdGenerator;
_pIdGenerator = nullptr;
if (pIdGenerator_ != nullptr) {
delete pIdGenerator_;
pIdGenerator_ = nullptr;
}
}
@ -78,69 +76,69 @@ MemVectors<EngineT>::~MemVectors() {
*/
template<typename EngineT>
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::get_mem_by_group(
const std::string& group_id) {
auto memIt = _memMap.find(group_id);
if (memIt != _memMap.end()) {
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::GetMemByTable(
const std::string& table_id) {
auto memIt = memMap_.find(table_id);
if (memIt != memMap_.end()) {
return memIt->second;
}
meta::GroupFileSchema group_file;
group_file.group_id = group_id;
auto status = _pMeta->add_group_file(group_file);
meta::TableFileSchema table_file;
table_file.table_id = table_id;
auto status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
return nullptr;
}
_memMap[group_id] = MemVectorsPtr(new MemVectors<EngineT>(_pMeta, group_file, options_));
return _memMap[group_id];
memMap_[table_id] = MemVectorsPtr(new MemVectors<EngineT>(pMeta_, table_file, options_));
return memMap_[table_id];
}
template<typename EngineT>
Status MemManager<EngineT>::add_vectors(const std::string& group_id_,
Status MemManager<EngineT>::InsertVectors(const std::string& table_id_,
size_t n_,
const float* vectors_,
IDNumbers& vector_ids_) {
std::unique_lock<std::mutex> lock(_mutex);
return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_);
std::unique_lock<std::mutex> lock(mutex_);
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}
template<typename EngineT>
Status MemManager<EngineT>::add_vectors_no_lock(const std::string& group_id,
Status MemManager<EngineT>::InsertVectorsNoLock(const std::string& table_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
MemVectorsPtr mem = get_mem_by_group(group_id);
MemVectorsPtr mem = GetMemByTable(table_id);
if (mem == nullptr) {
return Status::NotFound("Group " + group_id + " not found!");
return Status::NotFound("Group " + table_id + " not found!");
}
mem->add(n, vectors, vector_ids);
mem->Add(n, vectors, vector_ids);
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::mark_memory_as_immutable() {
std::unique_lock<std::mutex> lock(_mutex);
for (auto& kv: _memMap) {
_immMems.push_back(kv.second);
Status MemManager<EngineT>::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
for (auto& kv: memMap_) {
immMems_.push_back(kv.second);
}
_memMap.clear();
memMap_.clear();
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::serialize(std::vector<std::string>& group_ids) {
mark_memory_as_immutable();
Status MemManager<EngineT>::Serialize(std::vector<std::string>& table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string group_id;
group_ids.clear();
for (auto& mem : _immMems) {
mem->serialize(group_id);
group_ids.push_back(group_id);
std::string table_id;
table_ids.clear();
for (auto& mem : immMems_) {
mem->Serialize(table_id);
table_ids.push_back(table_id);
}
_immMems.clear();
immMems_.clear();
return Status::OK();
}
@ -148,5 +146,3 @@ Status MemManager<EngineT>::serialize(std::vector<std::string>& group_ids) {
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif

View File

@ -3,9 +3,10 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Meta.h"
#include <ctime>
#include <stdio.h>
#include "Meta.h"
namespace zilliz {
namespace vecwise {

View File

@ -4,14 +4,15 @@
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <cstddef>
#include <ctime>
#include <memory>
#include "MetaTypes.h"
#include "Options.h"
#include "Status.h"
#include <cstddef>
#include <ctime>
#include <memory>
namespace zilliz {
namespace vecwise {
namespace engine {
@ -20,49 +21,40 @@ namespace meta {
class Meta {
public:
typedef std::shared_ptr<Meta> Ptr;
using Ptr = std::shared_ptr<Meta>;
virtual Status add_group(GroupSchema& group_info) = 0;
virtual Status get_group(GroupSchema& group_info) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status CreateTable(TableSchema& table_schema) = 0;
virtual Status DescribeTable(TableSchema& table_schema) = 0;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0;
virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
virtual Status delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) = 0;
virtual Status CreateTableFile(TableFileSchema& file_schema) = 0;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) = 0;
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) = 0;
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) = 0;
virtual Status update_group_file(GroupFileSchema& group_file_) = 0;
virtual Status GetTableFile(TableFileSchema& file_schema) = 0;
virtual Status UpdateTableFile(TableFileSchema& file_schema) = 0;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) = 0;
virtual Status UpdateTableFiles(TableFilesSchema& files) = 0;
virtual Status update_files(GroupFilesSchema& files) = 0;
virtual Status files_to_search(const std::string& group_id,
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedGroupFilesSchema& files) = 0;
DatePartionedTableFilesSchema& files) = 0;
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) = 0;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) = 0;
virtual Status size(long& result) = 0;
virtual Status Size(long& result) = 0;
virtual Status archive_files() = 0;
virtual Status Archive() = 0;
virtual Status files_to_index(GroupFilesSchema&) = 0;
virtual Status FilesToIndex(TableFilesSchema&) = 0;
virtual Status cleanup() = 0;
virtual Status cleanup_ttl_files(uint16_t) = 0;
virtual Status CleanUp() = 0;
virtual Status CleanUpFilesWithTTL(uint16_t) = 0;
virtual Status drop_all() = 0;
virtual Status DropAll() = 0;
virtual Status count(const std::string& group_id, long& result) = 0;
virtual Status Count(const std::string& table_id, long& result) = 0;
static DateT GetDate(const std::time_t& t, int day_delta = 0);
static DateT GetDate();

View File

@ -18,16 +18,16 @@ typedef int DateT;
const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
struct GroupSchema {
struct TableSchema {
size_t id;
std::string group_id;
std::string table_id;
size_t files_cnt = 0;
uint16_t dimension;
std::string location = "";
std::string location;
long created_on;
}; // GroupSchema
}; // TableSchema
struct GroupFileSchema {
struct TableFileSchema {
typedef enum {
NEW,
RAW,
@ -37,19 +37,19 @@ struct GroupFileSchema {
} FILE_TYPE;
size_t id;
std::string group_id;
std::string table_id;
std::string file_id;
int file_type = NEW;
size_t size;
DateT date = EmptyDate;
uint16_t dimension;
std::string location = "";
std::string location;
long updated_time;
long created_on;
}; // GroupFileSchema
}; // TableFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema;
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
typedef std::vector<TableFileSchema> TableFilesSchema;
typedef std::map<DateT, TableFilesSchema> DatePartionedTableFilesSchema;
} // namespace meta
} // namespace engine

View File

@ -48,12 +48,6 @@ struct Options {
}; // Options
struct GroupOptions {
size_t dimension;
bool has_id = false;
}; // GroupOptions
} // namespace engine
} // namespace vecwise
} // namespace zilliz

View File

@ -3,9 +3,9 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Utils.h"
#include <chrono>
#include "Utils.h"
namespace zilliz {
namespace vecwise {

View File

@ -1,31 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "db_connection.h"
namespace zilliz {
namespace vecwise {
namespace engine {
using std::string;
using namespace sqlite_orm;
string storage_file_name = "default.sqlite";
SqliteDBPtr connect() {
SqliteDBPtr temp = std::make_shared<SqliteDB>(initStorage(storage_file_name));
temp->sync_schema();
temp->open_forever(); // thread safe option
//temp->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
return temp;
}
/* SqliteDBPtr Connection::connect_ = connect(); */
}
}
}

View File

@ -1,64 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <string>
#include <memory>
#include <sqlite_orm.h>
namespace zilliz {
namespace vecwise {
namespace engine {
struct GroupSchema {
size_t id;
std::string group_id;
size_t files_cnt = 0;
uint16_t dimension;
std::string location = "";
std::string next_file_location = "";
}; // GroupSchema
struct GroupFileSchema {
typedef enum {
RAW,
INDEX
} FILE_TYPE;
size_t id;
std::string group_id;
std::string file_id;
int files_type = RAW;
size_t rows;
std::string location = "";
}; // GroupFileSchema
inline auto initStorage(const std::string &path) {
using namespace sqlite_orm;
return make_storage(path,
// Add table below
make_table("Groups",
make_column("id", &GroupSchema::id, primary_key()),
make_column("group_id", &GroupSchema::group_id, unique()),
make_column("dimension", &GroupSchema::dimension),
make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))));
}
using SqliteDB = decltype(initStorage(""));
using SqliteDBPtr= std::shared_ptr<SqliteDB>;
class Connection {
protected:
static SqliteDBPtr connect_;
};
}
}
}

View File

@ -324,7 +324,7 @@ LicenseLibrary::GPUinfoFileDeserialization(const std::string &path,
}
ServerError
LicenseLibrary::GetDateTime(char *cha, time_t &data_time) {
LicenseLibrary::GetDateTime(const char *cha, time_t &data_time) {
tm tm_;
int year, month, day;
sscanf(cha, "%d-%d-%d", &year, &month, &day);

View File

@ -92,7 +92,7 @@ class LicenseLibrary {
std::map<int, std::string> &uuid_encrption_map);
static ServerError
GetDateTime(char *cha, time_t &data_time);
GetDateTime(const char *cha, time_t &data_time);
private:

View File

@ -78,17 +78,17 @@ BaseTaskPtr CreateTableTask::Create(const thrift::TableSchema& schema) {
ServerError CreateTableTask::OnExecute() {
TimeRecorder rc("CreateTableTask");
try {
if(schema_.vector_column_array.empty()) {
return SERVER_INVALID_ARGUMENT;
}
IVecIdMapper::GetInstance()->AddGroup(schema_.table_name);
engine::meta::GroupSchema group_info;
group_info.dimension = (uint16_t)schema_.vector_column_array[0].dimension;
group_info.group_id = schema_.table_name;
engine::Status stat = DB()->add_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.dimension = (uint16_t)schema_.vector_column_array[0].dimension;
table_schema.table_id = schema_.table_name;
engine::Status stat = DB()->CreateTable(table_schema);
if(!stat.ok()) {//could exist
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
@ -123,9 +123,9 @@ ServerError DescribeTableTask::OnExecute() {
TimeRecorder rc("DescribeTableTask");
try {
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -154,8 +154,8 @@ DeleteTableTask::DeleteTableTask(const std::string& table_name)
}
BaseTaskPtr DeleteTableTask::Create(const std::string& group_id) {
return std::shared_ptr<BaseTask>(new DeleteTableTask(group_id));
BaseTaskPtr DeleteTableTask::Create(const std::string& table_id) {
return std::shared_ptr<BaseTask>(new DeleteTableTask(table_id));
}
ServerError DeleteTableTask::OnExecute() {
@ -195,9 +195,9 @@ ServerError AddVectorTask::OnExecute() {
return SERVER_SUCCESS;
}
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -208,7 +208,7 @@ ServerError AddVectorTask::OnExecute() {
rc.Record("get group info");
uint64_t vec_count = (uint64_t)record_array_.size();
uint64_t group_dim = group_info.dimension;
uint64_t group_dim = table_schema.dimension;
std::vector<float> vec_f;
vec_f.resize(vec_count*group_dim);//allocate enough memory
for(uint64_t i = 0; i < vec_count; i++) {
@ -236,7 +236,7 @@ ServerError AddVectorTask::OnExecute() {
rc.Record("prepare vectors data");
stat = DB()->add_vectors(table_name_, vec_count, vec_f.data(), record_ids_);
stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
rc.Record("add vectors to engine");
if(!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
@ -293,9 +293,9 @@ ServerError SearchVectorTask::OnExecute() {
return error_code_;
}
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -305,7 +305,7 @@ ServerError SearchVectorTask::OnExecute() {
std::vector<float> vec_f;
uint64_t record_count = (uint64_t)record_array_.size();
vec_f.resize(record_count*group_info.dimension);
vec_f.resize(record_count*table_schema.dimension);
for(uint64_t i = 0; i < record_array_.size(); i++) {
const auto& record = record_array_[i];
@ -317,9 +317,9 @@ ServerError SearchVectorTask::OnExecute() {
}
uint64_t vec_dim = record.vector_map.begin()->second.size() / sizeof(double);//how many double value?
if (vec_dim != group_info.dimension) {
if (vec_dim != table_schema.dimension) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_info.dimension;
<< " vs. group dimension:" << table_schema.dimension;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
@ -335,7 +335,7 @@ ServerError SearchVectorTask::OnExecute() {
std::vector<DB_DATE> dates;
engine::QueryResults results;
stat = DB()->search(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
stat = DB()->Query(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;

View File

@ -66,21 +66,21 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
static const int group_dim = 256;
long size;
engine::meta::GroupSchema group_info;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
db_->size(size);
db_->Size(size);
int d = 256;
int nb = 20;
float *xb = new float[d * nb];
@ -92,13 +92,13 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
int loop = 100000;
for (auto i=0; i<loop; ++i) {
db_->add_vectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(group_name, nb, xb, vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->size(size);
db_->Size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_TRUE(size < 1 * engine::meta::G);
@ -113,14 +113,14 @@ TEST_F(DBTest, DB_TEST) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::GroupSchema group_info;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
@ -154,12 +154,12 @@ TEST_F(DBTest, DB_TEST) {
for (auto j=0; j<10; ++j) {
ss.str("");
db_->count(group_name, count);
db_->Size(count);
prev_count = count;
START_TIMER;
stat = db_->search(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/engine::meta::M << " M";
stat = db_->Query(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
@ -181,10 +181,10 @@ TEST_F(DBTest, DB_TEST) {
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->add_vectors(group_name, qb, qxb, target_ids);
db_->InsertVectors(group_name, qb, qxb, target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->add_vectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(group_name, nb, xb, vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
@ -199,14 +199,14 @@ TEST_F(DBTest, SEARCH_TEST) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::GroupSchema group_info;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
@ -240,7 +240,7 @@ TEST_F(DBTest, SEARCH_TEST) {
// insert data
const int batch_size = 100;
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->add_vectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids);
stat = db_->InsertVectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids);
if (j == 200){ sleep(1);}
ASSERT_STATS(stat);
}
@ -248,7 +248,7 @@ TEST_F(DBTest, SEARCH_TEST) {
sleep(2); // wait until build index finish
engine::QueryResults results;
stat = db_->search(group_name, k, nq, xq.data(), results);
stat = db_->Query(group_name, k, nq, xq.data(), results);
ASSERT_STATS(stat);
// TODO(linxj): add groundTruth assert

View File

@ -18,76 +18,76 @@
using namespace zilliz::vecwise::engine;
TEST_F(MetaTest, GROUP_TEST) {
auto group_id = "meta_test_group";
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl_->add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl_->CreateTable(group);
ASSERT_TRUE(status.ok());
auto gid = group.id;
group.id = -1;
status = impl_->get_group(group);
status = impl_->DescribeTable(group);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group.id, gid);
ASSERT_EQ(group.group_id, group_id);
ASSERT_EQ(group.table_id, table_id);
group.group_id = "not_found";
status = impl_->get_group(group);
group.table_id = "not_found";
status = impl_->DescribeTable(group);
ASSERT_TRUE(!status.ok());
group.group_id = group_id;
status = impl_->add_group(group);
group.table_id = table_id;
status = impl_->CreateTable(group);
ASSERT_TRUE(!status.ok());
}
TEST_F(MetaTest, GROUP_FILE_TEST) {
auto group_id = "meta_test_group";
TEST_F(MetaTest, table_file_TEST) {
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl_->add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl_->CreateTable(group);
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
status = impl_->add_group_file(group_file);
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
status = impl_->CreateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.file_type, meta::GroupFileSchema::NEW);
ASSERT_EQ(table_file.file_type, meta::TableFileSchema::NEW);
auto file_id = group_file.file_id;
auto file_id = table_file.file_id;
auto new_file_type = meta::GroupFileSchema::INDEX;
group_file.file_type = new_file_type;
auto new_file_type = meta::TableFileSchema::INDEX;
table_file.file_type = new_file_type;
status = impl_->update_group_file(group_file);
status = impl_->UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.file_type, new_file_type);
ASSERT_EQ(table_file.file_type, new_file_type);
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
status = impl_->delete_group_partitions(group_file.group_id, dates);
status = impl_->DropPartitionsByDates(table_file.table_id, dates);
ASSERT_FALSE(status.ok());
dates.clear();
for (auto i=2; i < 10; ++i) {
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
}
status = impl_->delete_group_partitions(group_file.group_id, dates);
status = impl_->DropPartitionsByDates(table_file.table_id, dates);
ASSERT_TRUE(status.ok());
group_file.date = meta::Meta::GetDateWithDelta(-2);
status = impl_->update_group_file(group_file);
table_file.date = meta::Meta::GetDateWithDelta(-2);
status = impl_->UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.date, meta::Meta::GetDateWithDelta(-2));
ASSERT_FALSE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
ASSERT_EQ(table_file.date, meta::Meta::GetDateWithDelta(-2));
ASSERT_FALSE(table_file.file_type == meta::TableFileSchema::TO_DELETE);
dates.clear();
dates.push_back(group_file.date);
status = impl_->delete_group_partitions(group_file.group_id, dates);
dates.push_back(table_file.date);
status = impl_->DropPartitionsByDates(table_file.table_id, dates);
ASSERT_TRUE(status.ok());
status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file);
status = impl_->GetTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
ASSERT_TRUE(table_file.file_type == meta::TableFileSchema::TO_DELETE);
}
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
@ -100,44 +100,44 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
options.archive_conf = ArchiveConf("delete", ss.str());
auto impl = meta::DBMetaImpl(options);
auto group_id = "meta_test_group";
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl.add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl.CreateTable(group);
meta::GroupFilesSchema files;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
auto cnt = 100;
long ts = utils::GetMicroSecTimeStamp();
std::vector<int> days;
for (auto i=0; i<cnt; ++i) {
status = impl.add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
status = impl.CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::NEW;
int day = rand() % (days_num*2);
group_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
status = impl.update_group_file(group_file);
files.push_back(group_file);
table_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
days.push_back(day);
}
impl.archive_files();
impl.Archive();
int i = 0;
for (auto file : files) {
status = impl.get_group_file(file.group_id, file.file_id, file);
status = impl.GetTableFile(file);
ASSERT_TRUE(status.ok());
if (days[i] < days_num) {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
ASSERT_EQ(file.file_type, meta::TableFileSchema::NEW);
} else {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::TO_DELETE);
ASSERT_EQ(file.file_type, meta::TableFileSchema::TO_DELETE);
}
i++;
}
impl.drop_all();
impl.DropAll();
}
TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
@ -146,100 +146,100 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
options.archive_conf = ArchiveConf("delete", "disk:11");
auto impl = meta::DBMetaImpl(options);
auto group_id = "meta_test_group";
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl.add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl.CreateTable(group);
meta::GroupFilesSchema files;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
auto cnt = 10;
auto each_size = 2UL;
for (auto i=0; i<cnt; ++i) {
status = impl.add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
group_file.size = each_size * meta::G;
status = impl.update_group_file(group_file);
files.push_back(group_file);
status = impl.CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::NEW;
table_file.size = each_size * meta::G;
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
}
impl.archive_files();
impl.Archive();
int i = 0;
for (auto file : files) {
status = impl.get_group_file(file.group_id, file.file_id, file);
status = impl.GetTableFile(file);
ASSERT_TRUE(status.ok());
if (i < 5) {
ASSERT_TRUE(file.file_type == meta::GroupFileSchema::TO_DELETE);
ASSERT_TRUE(file.file_type == meta::TableFileSchema::TO_DELETE);
} else {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
ASSERT_EQ(file.file_type, meta::TableFileSchema::NEW);
}
++i;
}
impl.drop_all();
impl.DropAll();
}
TEST_F(MetaTest, GROUP_FILES_TEST) {
auto group_id = "meta_test_group";
TEST_F(MetaTest, TABLE_FILES_TEST) {
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl_->add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl_->CreateTable(group);
int new_files_cnt = 4;
int raw_files_cnt = 5;
int to_index_files_cnt = 6;
int index_files_cnt = 7;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
for (auto i=0; i<new_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::NEW;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<raw_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::RAW;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::RAW;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<to_index_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::TO_INDEX;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::TO_INDEX;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<index_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::INDEX;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::INDEX;
status = impl_->UpdateTableFile(table_file);
}
meta::GroupFilesSchema files;
meta::TableFilesSchema files;
status = impl_->files_to_index(files);
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatePartionedGroupFilesSchema dated_files;
status = impl_->files_to_merge(group.group_id, dated_files);
meta::DatePartionedTableFilesSchema dated_files;
status = impl_->FilesToMerge(group.table_id, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[group_file.date].size(), raw_files_cnt);
ASSERT_EQ(dated_files[table_file.date].size(), raw_files_cnt);
status = impl_->files_to_index(files);
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatesT dates = {group_file.date};
status = impl_->files_to_search(group_id, dates, dated_files);
meta::DatesT dates = {table_file.date};
status = impl_->FilesToSearch(table_id, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[group_file.date].size(),
ASSERT_EQ(dated_files[table_file.date].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
}

View File

@ -59,5 +59,5 @@ void MetaTest::SetUp() {
}
void MetaTest::TearDown() {
impl_->drop_all();
impl_->DropAll();
}

View File

@ -23,73 +23,6 @@
using namespace zilliz::vecwise;
TEST_F(DBTest, Metric_Test) {
using namespace zilliz::vecwise;
// server::Metrics::GetInstance().Init();
server::Metrics::GetInstance().Init();
// server::PrometheusMetrics::GetInstance().exposer_ptr()->RegisterCollectable(server::PrometheusMetrics::GetInstance().registry_ptr());
// server::Metrics::GetInstance().exposer_ptr()->RegisterCollectable(server::Metrics::GetInstance().registry_ptr());
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::GroupSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
// int iter = 600000;
// for (int i = 0; i < iter; ++i) {
// db_->get_group(group_info);
// bool b = i % 2;
// db_->has_group(std::to_string(i),b);
// db_->add_group(group_info);
//
// std::this_thread::sleep_for(std::chrono::microseconds(1));
// }
stat = db_->get_group(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
int d = 256;
int nb = 50;
float *xb = new float[d * nb];
for(int i = 0; i < nb; i++) {
for(int j = 0; j < d; j++) xb[d * i + j] = drand48();
xb[d * i] += i / 2000.;
}
int qb = 5;
float *qxb = new float[d * qb];
for(int i = 0; i < qb; i++) {
for(int j = 0; j < d; j++) qxb[d * i + j] = drand48();
qxb[d * i] += i / 2000.;
}
int loop =100000;
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->add_vectors(group_name, qb, qxb, target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->add_vectors(group_name, nb, xb, vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(5));
}
// search.join();
delete [] xb;
delete [] qxb;
};
TEST_F(DBTest, Metric_Tes) {
@ -104,16 +37,15 @@ TEST_F(DBTest, Metric_Tes) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::GroupSchema group_info;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
@ -145,11 +77,11 @@ TEST_F(DBTest, Metric_Tes) {
for (auto j=0; j<10; ++j) {
ss.str("");
db_->count(group_name, count);
db_->Size(count);
prev_count = count;
START_TIMER;
stat = db_->search(group_name, k, qb, qxb, results);
stat = db_->Query(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/(1024*1024) << " M";
// STOP_TIMER(ss.str());
@ -172,10 +104,10 @@ TEST_F(DBTest, Metric_Tes) {
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->add_vectors(group_name, qb, qxb, target_ids);
db_->InsertVectors(group_name, qb, qxb, target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->add_vectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(group_name, nb, xb, vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}