mirror of https://github.com/milvus-io/milvus.git
commit
8510104066
|
@ -10,6 +10,10 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
|
||||
### New Feature
|
||||
|
||||
- MS-5 - Implement Auto Archive Feature
|
||||
|
||||
### Task
|
||||
|
||||
- MS-1 - Add CHANGELOG.md
|
||||
- MS-4 - Refactor the vecwise_engine code structure
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@ public:
|
|||
|
||||
virtual Status add_group(meta::GroupSchema& group_info_) = 0;
|
||||
virtual Status get_group(meta::GroupSchema& group_info_) = 0;
|
||||
virtual Status delete_vectors(const std::string& group_id,
|
||||
const meta::DatesT& dates) = 0;
|
||||
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
|
@ -37,6 +39,8 @@ public:
|
|||
virtual Status search(const std::string& group_id, size_t k, size_t nq,
|
||||
const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0;
|
||||
|
||||
virtual Status size(long& result) = 0;
|
||||
|
||||
virtual Status drop_all() = 0;
|
||||
|
||||
virtual Status count(const std::string& group_id, long& result) = 0;
|
||||
|
|
|
@ -63,6 +63,12 @@ Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
|
|||
return result;
|
||||
}
|
||||
|
||||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::delete_vectors(const std::string& group_id,
|
||||
const meta::DatesT& dates) {
|
||||
return _pMeta->delete_group_partitions(group_id, dates);
|
||||
}
|
||||
|
||||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
|
||||
Status result = _pMeta->has_group(group_id_, has_or_not_);
|
||||
|
@ -369,7 +375,7 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
|
|||
} else {
|
||||
group_file.file_type = meta::GroupFileSchema::RAW;
|
||||
}
|
||||
group_file.rows = index_size;
|
||||
group_file.size = index_size;
|
||||
updated.push_back(group_file);
|
||||
status = _pMeta->update_files(updated);
|
||||
LOG(DEBUG) << "New merged file " << group_file.file_id <<
|
||||
|
@ -403,6 +409,8 @@ Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
|
|||
merge_files(group_id, kv.first, kv.second);
|
||||
}
|
||||
|
||||
_pMeta->archive_files();
|
||||
|
||||
try_build_index();
|
||||
|
||||
_pMeta->cleanup_ttl_files(1);
|
||||
|
@ -430,7 +438,7 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
|
|||
METRICS_INSTANCE.BuildIndexDurationSecondsHistogramObserve(total_time);
|
||||
|
||||
group_file.file_type = meta::GroupFileSchema::INDEX;
|
||||
group_file.rows = index->Size();
|
||||
group_file.size = index->Size();
|
||||
|
||||
auto to_remove = file;
|
||||
to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
|
||||
|
@ -443,6 +451,7 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
|
|||
<< " from file " << to_remove.file_id;
|
||||
|
||||
index->Cache();
|
||||
_pMeta->archive_files();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -503,6 +512,11 @@ Status DBImpl<EngineT>::count(const std::string& group_id, long& result) {
|
|||
return _pMeta->count(group_id, result);
|
||||
}
|
||||
|
||||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::size(long& result) {
|
||||
return _pMeta->size(result);
|
||||
}
|
||||
|
||||
template<typename EngineT>
|
||||
DBImpl<EngineT>::~DBImpl() {
|
||||
{
|
||||
|
|
|
@ -35,6 +35,7 @@ public:
|
|||
|
||||
virtual Status add_group(meta::GroupSchema& group_info) override;
|
||||
virtual Status get_group(meta::GroupSchema& group_info) override;
|
||||
virtual Status delete_vectors(const std::string& group_id, const meta::DatesT& dates) override;
|
||||
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
|
||||
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
|
@ -54,6 +55,8 @@ public:
|
|||
|
||||
virtual Status count(const std::string& group_id, long& result) override;
|
||||
|
||||
virtual Status size(long& result) override;
|
||||
|
||||
virtual ~DBImpl();
|
||||
|
||||
private:
|
||||
|
|
|
@ -12,8 +12,11 @@
|
|||
#include <fstream>
|
||||
#include <sqlite_orm.h>
|
||||
#include <easylogging++.h>
|
||||
|
||||
#include "DBMetaImpl.h"
|
||||
#include "IDGenerator.h"
|
||||
#include "Utils.h"
|
||||
#include "MetaConsts.h"
|
||||
#include "metrics/Metrics.h"
|
||||
|
||||
namespace zilliz {
|
||||
|
@ -29,39 +32,26 @@ inline auto StoragePrototype(const std::string& path) {
|
|||
make_column("id", &GroupSchema::id, primary_key()),
|
||||
make_column("group_id", &GroupSchema::group_id, unique()),
|
||||
make_column("dimension", &GroupSchema::dimension),
|
||||
make_column("created_on", &GroupSchema::created_on),
|
||||
make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))),
|
||||
make_table("GroupFile",
|
||||
make_column("id", &GroupFileSchema::id, primary_key()),
|
||||
make_column("group_id", &GroupFileSchema::group_id),
|
||||
make_column("file_id", &GroupFileSchema::file_id),
|
||||
make_column("file_type", &GroupFileSchema::file_type),
|
||||
make_column("rows", &GroupFileSchema::rows, default_value(0)),
|
||||
make_column("size", &GroupFileSchema::size, default_value(0)),
|
||||
make_column("updated_time", &GroupFileSchema::updated_time),
|
||||
make_column("created_on", &GroupFileSchema::created_on),
|
||||
make_column("date", &GroupFileSchema::date))
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
using ConnectorT = decltype(StoragePrototype("/tmp/dummy.sqlite3"));
|
||||
using ConnectorT = decltype(StoragePrototype(""));
|
||||
static std::unique_ptr<ConnectorT> ConnectorPtr;
|
||||
|
||||
long GetFileSize(const std::string& filename)
|
||||
{
|
||||
struct stat stat_buf;
|
||||
int rc = stat(filename.c_str(), &stat_buf);
|
||||
return rc == 0 ? stat_buf.st_size : -1;
|
||||
}
|
||||
|
||||
std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
|
||||
return _options.path + "/" + group_id;
|
||||
}
|
||||
|
||||
long DBMetaImpl::GetMicroSecTimeStamp() {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
now.time_since_epoch()).count();
|
||||
|
||||
return micros;
|
||||
return _options.path + "/tables/" + group_id;
|
||||
}
|
||||
|
||||
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
|
||||
|
@ -80,6 +70,22 @@ void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) {
|
|||
group_file.location = ss.str();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::NextGroupId(std::string& group_id) {
|
||||
std::stringstream ss;
|
||||
SimpleIDGenerator g;
|
||||
ss << g.getNextIDNumber();
|
||||
group_id = ss.str();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::NextFileId(std::string& file_id) {
|
||||
std::stringstream ss;
|
||||
SimpleIDGenerator g;
|
||||
ss << g.getNextIDNumber();
|
||||
file_id = ss.str();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
|
||||
: _options(options_) {
|
||||
initialize();
|
||||
|
@ -105,22 +111,57 @@ Status DBMetaImpl::initialize() {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
// PXU TODO: Temp solution. Will fix later
|
||||
Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
|
||||
const meta::DatesT& dates) {
|
||||
if (dates.size() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
GroupSchema group_info;
|
||||
group_info.group_id = group_id;
|
||||
auto status = get_group(group_info);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
auto yesterday = GetDateWithDelta(-1);
|
||||
|
||||
for (auto& date : dates) {
|
||||
if (date >= yesterday) {
|
||||
return Status::Error("Could not delete partitions with 2 days");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
|
||||
),
|
||||
where(
|
||||
c(&GroupFileSchema::group_id) == group_id and
|
||||
in(&GroupFileSchema::date, dates)
|
||||
));
|
||||
} catch (std::exception & e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
throw e;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::add_group(GroupSchema& group_info) {
|
||||
METRICS_INSTANCE.MetaAccessTotalIncrement();
|
||||
if (group_info.group_id == "") {
|
||||
std::stringstream ss;
|
||||
SimpleIDGenerator g;
|
||||
ss << g.getNextIDNumber();
|
||||
group_info.group_id = ss.str();
|
||||
NextGroupId(group_info.group_id);
|
||||
}
|
||||
group_info.files_cnt = 0;
|
||||
group_info.id = -1;
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
group_info.created_on = utils::GetMicroSecTimeStamp();
|
||||
auto start_time = MERTICS_NOW_TIME;
|
||||
{
|
||||
try {
|
||||
auto id = ConnectorPtr->insert(group_info);
|
||||
group_info.id = id;
|
||||
/* LOG(DEBUG) << "Add group " << id; */
|
||||
} catch (...) {
|
||||
return Status::DBTransactionError("Add Group Error");
|
||||
}
|
||||
|
@ -132,7 +173,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
|
|||
auto group_path = GetGroupPath(group_info.group_id);
|
||||
|
||||
if (!boost::filesystem::is_directory(group_path)) {
|
||||
auto ret = boost::filesystem::create_directory(group_path);
|
||||
auto ret = boost::filesystem::create_directories(group_path);
|
||||
if (!ret) {
|
||||
LOG(ERROR) << "Create directory " << group_path << " Error";
|
||||
}
|
||||
|
@ -207,14 +248,12 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
|
|||
return status;
|
||||
}
|
||||
|
||||
SimpleIDGenerator g;
|
||||
std::stringstream ss;
|
||||
ss << g.getNextIDNumber();
|
||||
NextFileId(group_file.file_id);
|
||||
group_file.file_type = GroupFileSchema::NEW;
|
||||
group_file.file_id = ss.str();
|
||||
group_file.dimension = group_info.dimension;
|
||||
group_file.rows = 0;
|
||||
group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front();
|
||||
group_file.size = 0;
|
||||
group_file.created_on = utils::GetMicroSecTimeStamp();
|
||||
group_file.updated_time = group_file.created_on;
|
||||
GetGroupFilePath(group_file);
|
||||
|
||||
{
|
||||
|
@ -226,7 +265,6 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
|
|||
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
|
||||
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
|
||||
group_file.id = id;
|
||||
/* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */
|
||||
} catch (...) {
|
||||
return Status::DBTransactionError("Add file Error");
|
||||
}
|
||||
|
@ -255,7 +293,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
|
|||
&GroupFileSchema::group_id,
|
||||
&GroupFileSchema::file_id,
|
||||
&GroupFileSchema::file_type,
|
||||
&GroupFileSchema::rows,
|
||||
&GroupFileSchema::size,
|
||||
&GroupFileSchema::date),
|
||||
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
|
@ -270,7 +308,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
|
|||
group_file.group_id = std::get<1>(file);
|
||||
group_file.file_id = std::get<2>(file);
|
||||
group_file.file_type = std::get<3>(file);
|
||||
group_file.rows = std::get<4>(file);
|
||||
group_file.size = std::get<4>(file);
|
||||
group_file.date = std::get<5>(file);
|
||||
GetGroupFilePath(group_file);
|
||||
auto groupItr = groups.find(group_file.group_id);
|
||||
|
@ -308,7 +346,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
|
|||
&GroupFileSchema::group_id,
|
||||
&GroupFileSchema::file_id,
|
||||
&GroupFileSchema::file_type,
|
||||
&GroupFileSchema::rows,
|
||||
&GroupFileSchema::size,
|
||||
&GroupFileSchema::date),
|
||||
where(c(&GroupFileSchema::group_id) == group_id and
|
||||
in(&GroupFileSchema::date, dates) and
|
||||
|
@ -331,7 +369,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
|
|||
group_file.group_id = std::get<1>(file);
|
||||
group_file.file_id = std::get<2>(file);
|
||||
group_file.file_type = std::get<3>(file);
|
||||
group_file.rows = std::get<4>(file);
|
||||
group_file.size = std::get<4>(file);
|
||||
group_file.date = std::get<5>(file);
|
||||
group_file.dimension = group_info.dimension;
|
||||
GetGroupFilePath(group_file);
|
||||
|
@ -360,7 +398,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
|
|||
&GroupFileSchema::group_id,
|
||||
&GroupFileSchema::file_id,
|
||||
&GroupFileSchema::file_type,
|
||||
&GroupFileSchema::rows,
|
||||
&GroupFileSchema::size,
|
||||
&GroupFileSchema::date),
|
||||
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and
|
||||
c(&GroupFileSchema::group_id) == group_id));
|
||||
|
@ -380,7 +418,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
|
|||
group_file.group_id = std::get<1>(file);
|
||||
group_file.file_id = std::get<2>(file);
|
||||
group_file.file_type = std::get<3>(file);
|
||||
group_file.rows = std::get<4>(file);
|
||||
group_file.size = std::get<4>(file);
|
||||
group_file.date = std::get<5>(file);
|
||||
group_file.dimension = group_info.dimension;
|
||||
GetGroupFilePath(group_file);
|
||||
|
@ -408,7 +446,32 @@ Status DBMetaImpl::has_group_file(const std::string& group_id_,
|
|||
Status DBMetaImpl::get_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
GroupFileSchema& group_file_info_) {
|
||||
//PXU TODO
|
||||
try {
|
||||
auto files = ConnectorPtr->select(columns(&GroupFileSchema::id,
|
||||
&GroupFileSchema::group_id,
|
||||
&GroupFileSchema::file_id,
|
||||
&GroupFileSchema::file_type,
|
||||
&GroupFileSchema::size,
|
||||
&GroupFileSchema::date),
|
||||
where(c(&GroupFileSchema::file_id) == file_id_ and
|
||||
c(&GroupFileSchema::group_id) == group_id_
|
||||
));
|
||||
assert(files.size() <= 1);
|
||||
if (files.size() == 1) {
|
||||
group_file_info_.id = std::get<0>(files[0]);
|
||||
group_file_info_.group_id = std::get<1>(files[0]);
|
||||
group_file_info_.file_id = std::get<2>(files[0]);
|
||||
group_file_info_.file_type = std::get<3>(files[0]);
|
||||
group_file_info_.size = std::get<4>(files[0]);
|
||||
group_file_info_.date = std::get<5>(files[0]);
|
||||
} else {
|
||||
return Status::NotFound("GroupFile " + file_id_ + " not found");
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -419,8 +482,115 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
// PXU TODO: Support Swap
|
||||
Status DBMetaImpl::archive_files() {
|
||||
auto& criterias = _options.archive_conf.GetCriterias();
|
||||
if (criterias.size() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
for (auto kv : criterias) {
|
||||
auto& criteria = kv.first;
|
||||
auto& limit = kv.second;
|
||||
if (criteria == "days") {
|
||||
long usecs = limit * D_SEC * US_PS;
|
||||
long now = utils::GetMicroSecTimeStamp();
|
||||
try
|
||||
{
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
|
||||
),
|
||||
where(
|
||||
c(&GroupFileSchema::created_on) < (long)(now - usecs) and
|
||||
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
|
||||
));
|
||||
} catch (std::exception & e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if (criteria == "disk") {
|
||||
long sum = 0;
|
||||
size(sum);
|
||||
|
||||
// PXU TODO: refactor size
|
||||
auto to_delete = (sum - limit*G);
|
||||
discard_files_of_size(to_delete);
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::size(long& result) {
|
||||
result = 0;
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(sum(&GroupFileSchema::size)),
|
||||
where(
|
||||
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
|
||||
));
|
||||
|
||||
for (auto& sub_query : selected) {
|
||||
if(!std::get<0>(sub_query)) {
|
||||
continue;
|
||||
}
|
||||
result += (long)(*std::get<0>(sub_query));
|
||||
}
|
||||
} catch (std::exception & e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
|
||||
LOG(DEBUG) << "Abort to discard size=" << to_discard_size;
|
||||
if (to_discard_size <= 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
|
||||
&GroupFileSchema::size),
|
||||
where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE),
|
||||
order_by(&GroupFileSchema::id),
|
||||
limit(10));
|
||||
std::vector<int> ids;
|
||||
|
||||
for (auto& file : selected) {
|
||||
if (to_discard_size <= 0) break;
|
||||
GroupFileSchema group_file;
|
||||
group_file.id = std::get<0>(file);
|
||||
group_file.size = std::get<1>(file);
|
||||
ids.push_back(group_file.id);
|
||||
LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size;
|
||||
to_discard_size -= group_file.size;
|
||||
}
|
||||
|
||||
if (ids.size() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
|
||||
),
|
||||
where(
|
||||
in(&GroupFileSchema::id, ids)
|
||||
));
|
||||
|
||||
} catch (std::exception & e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
||||
return discard_files_of_size(to_discard_size);
|
||||
}
|
||||
|
||||
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
|
||||
group_file.updated_time = GetMicroSecTimeStamp();
|
||||
group_file.updated_time = utils::GetMicroSecTimeStamp();
|
||||
try {
|
||||
METRICS_INSTANCE.MetaAccessTotalIncrement();
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
|
@ -428,13 +598,6 @@ Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
|
|||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
|
||||
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
|
||||
/* auto commited = ConnectorPtr->transaction([&] () mutable { */
|
||||
/* ConnectorPtr->update(group_file); */
|
||||
/* return true; */
|
||||
/* }); */
|
||||
/* if (!commited) { */
|
||||
/* return Status::DBTransactionError("Update file Error"); */
|
||||
/* } */
|
||||
} catch (std::exception & e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
|
||||
|
@ -449,7 +612,7 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) {
|
|||
auto start_time = METRICS_NOW_TIME;
|
||||
auto commited = ConnectorPtr->transaction([&] () mutable {
|
||||
for (auto& file : files) {
|
||||
file.updated_time = GetMicroSecTimeStamp();
|
||||
file.updated_time = utils::GetMicroSecTimeStamp();
|
||||
ConnectorPtr->update(file);
|
||||
}
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
|
@ -468,16 +631,16 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) {
|
|||
}
|
||||
|
||||
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
|
||||
auto now = GetMicroSecTimeStamp();
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
|
||||
&GroupFileSchema::group_id,
|
||||
&GroupFileSchema::file_id,
|
||||
&GroupFileSchema::file_type,
|
||||
&GroupFileSchema::rows,
|
||||
&GroupFileSchema::size,
|
||||
&GroupFileSchema::date),
|
||||
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and
|
||||
c(&GroupFileSchema::updated_time) > now - 1000000*seconds));
|
||||
c(&GroupFileSchema::updated_time) > now - seconds*US_PS));
|
||||
|
||||
GroupFilesSchema updated;
|
||||
|
||||
|
@ -487,7 +650,7 @@ Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
|
|||
group_file.group_id = std::get<1>(file);
|
||||
group_file.file_id = std::get<2>(file);
|
||||
group_file.file_type = std::get<3>(file);
|
||||
group_file.rows = std::get<4>(file);
|
||||
group_file.size = std::get<4>(file);
|
||||
group_file.date = std::get<5>(file);
|
||||
GetGroupFilePath(group_file);
|
||||
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
|
||||
|
@ -510,7 +673,7 @@ Status DBMetaImpl::cleanup() {
|
|||
&GroupFileSchema::group_id,
|
||||
&GroupFileSchema::file_id,
|
||||
&GroupFileSchema::file_type,
|
||||
&GroupFileSchema::rows,
|
||||
&GroupFileSchema::size,
|
||||
&GroupFileSchema::date),
|
||||
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or
|
||||
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW));
|
||||
|
@ -523,7 +686,7 @@ Status DBMetaImpl::cleanup() {
|
|||
group_file.group_id = std::get<1>(file);
|
||||
group_file.file_id = std::get<2>(file);
|
||||
group_file.file_type = std::get<3>(file);
|
||||
group_file.rows = std::get<4>(file);
|
||||
group_file.size = std::get<4>(file);
|
||||
group_file.date = std::get<5>(file);
|
||||
GetGroupFilePath(group_file);
|
||||
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
|
||||
|
@ -545,7 +708,7 @@ Status DBMetaImpl::count(const std::string& group_id, long& result) {
|
|||
try {
|
||||
METRICS_INSTANCE.MetaAccessTotalIncrement();
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows,
|
||||
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size,
|
||||
&GroupFileSchema::date),
|
||||
where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or
|
||||
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or
|
||||
|
|
|
@ -24,6 +24,8 @@ public:
|
|||
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
|
||||
|
||||
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
|
||||
virtual Status delete_group_partitions(const std::string& group_id,
|
||||
const meta::DatesT& dates) override;
|
||||
|
||||
virtual Status has_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
|
@ -48,6 +50,10 @@ public:
|
|||
|
||||
virtual Status files_to_index(GroupFilesSchema&) override;
|
||||
|
||||
virtual Status archive_files() override;
|
||||
|
||||
virtual Status size(long& result) override;
|
||||
|
||||
virtual Status cleanup() override;
|
||||
|
||||
virtual Status cleanup_ttl_files(uint16_t seconds) override;
|
||||
|
@ -59,8 +65,9 @@ public:
|
|||
virtual ~DBMetaImpl();
|
||||
|
||||
private:
|
||||
|
||||
long GetMicroSecTimeStamp();
|
||||
Status NextFileId(std::string& file_id);
|
||||
Status NextGroupId(std::string& group_id);
|
||||
Status discard_files_of_size(long to_discard_size);
|
||||
Status get_group_no_lock(GroupSchema& group_info);
|
||||
std::string GetGroupPath(const std::string& group_id);
|
||||
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include <exception>
|
||||
#include <string>
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
|
||||
class Exception : public std::exception {
|
||||
public:
|
||||
Exception(const std::string& message)
|
||||
: message_(message) {
|
||||
}
|
||||
|
||||
Exception()
|
||||
: message_() {
|
||||
}
|
||||
|
||||
virtual const char* what() const throw() {
|
||||
if (message_.empty()) {
|
||||
return "Default Exception.";
|
||||
} else {
|
||||
return message_.c_str();
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~Exception() throw() {};
|
||||
|
||||
protected:
|
||||
|
||||
std::string message_;
|
||||
};
|
||||
|
||||
class InvalidArgumentException : public Exception {
|
||||
public:
|
||||
InvalidArgumentException() : Exception("Invalid Argument"){};
|
||||
InvalidArgumentException(const std::string& message) : Exception(message) {};
|
||||
};
|
||||
|
||||
class OutOfRangeException : public Exception {
|
||||
public:
|
||||
OutOfRangeException() : Exception("Out Of Range"){};
|
||||
OutOfRangeException(const std::string& message) : Exception(message) {};
|
||||
};
|
||||
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
|
@ -48,12 +48,12 @@ size_t FaissExecutionEngine<IndexTrait>::Count() const {
|
|||
|
||||
template<class IndexTrait>
|
||||
size_t FaissExecutionEngine<IndexTrait>::Size() const {
|
||||
return (size_t)(Count() * pIndex_->d);
|
||||
return (size_t)(Count() * pIndex_->d)*sizeof(float);
|
||||
}
|
||||
|
||||
template<class IndexTrait>
|
||||
size_t FaissExecutionEngine<IndexTrait>::PhysicalSize() const {
|
||||
return (size_t)(Size()*sizeof(float));
|
||||
return (size_t)(Count() * pIndex_->d)*sizeof(float);
|
||||
}
|
||||
|
||||
template<class IndexTrait>
|
||||
|
|
|
@ -241,6 +241,11 @@ Status LocalMetaImpl::update_files(GroupFilesSchema& files) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::archive_files() {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::cleanup() {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
|
@ -256,6 +261,11 @@ Status LocalMetaImpl::drop_all() {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::size(long& result) {
|
||||
// PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::count(const std::string& group_id, long& result) {
|
||||
// PXU TODO
|
||||
return Status::OK();
|
||||
|
|
|
@ -22,7 +22,9 @@ public:
|
|||
virtual Status get_group(GroupSchema& group_info_) override;
|
||||
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
|
||||
|
||||
virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
|
||||
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
|
||||
/* virtual Status delete_group_partitions(const std::string& group_id, */
|
||||
/* const meta::DatesT& dates) override; */
|
||||
|
||||
virtual Status has_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
|
@ -45,12 +47,16 @@ public:
|
|||
|
||||
virtual Status files_to_index(GroupFilesSchema&) override;
|
||||
|
||||
virtual Status archive_files() override;
|
||||
|
||||
virtual Status cleanup_ttl_files(uint16_t seconds) override;
|
||||
|
||||
virtual Status count(const std::string& group_id, long& result) override;
|
||||
|
||||
virtual Status drop_all() override;
|
||||
|
||||
virtual Status size(long& result) override;
|
||||
|
||||
private:
|
||||
|
||||
Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info);
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
#include "MemManager.h"
|
||||
#include "Meta.h"
|
||||
#include "MetaConsts.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
|
@ -48,16 +49,16 @@ size_t MemVectors<EngineT>::approximate_size() const {
|
|||
template<typename EngineT>
|
||||
Status MemVectors<EngineT>::serialize(std::string& group_id) {
|
||||
group_id = schema_.group_id;
|
||||
auto rows = approximate_size();
|
||||
auto size = approximate_size();
|
||||
pEE_->Serialize();
|
||||
schema_.rows = rows;
|
||||
schema_.file_type = (rows >= options_.index_trigger_size) ?
|
||||
schema_.size = size;
|
||||
schema_.file_type = (size >= options_.index_trigger_size) ?
|
||||
meta::GroupFileSchema::TO_INDEX : meta::GroupFileSchema::RAW;
|
||||
|
||||
auto status = pMeta_->update_group_file(schema_);
|
||||
|
||||
LOG(DEBUG) << "New " << ((schema_.file_type == meta::GroupFileSchema::RAW) ? "raw" : "to_index")
|
||||
<< " file " << schema_.file_id << " of size " << pEE_->PhysicalSize() / (1024*1024) << " M";
|
||||
<< " file " << schema_.file_id << " of size " << pEE_->Size() / meta::M << " M";
|
||||
|
||||
pEE_->Cache();
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include <ctime>
|
||||
#include <stdio.h>
|
||||
#include "Meta.h"
|
||||
|
||||
namespace zilliz {
|
||||
|
@ -11,13 +12,33 @@ namespace vecwise {
|
|||
namespace engine {
|
||||
namespace meta {
|
||||
|
||||
DateT Meta::GetDate(const std::time_t& t) {
|
||||
tm *ltm = std::localtime(&t);
|
||||
return ltm->tm_year*10000 + ltm->tm_mon*100 + ltm->tm_mday;
|
||||
DateT Meta::GetDate(const std::time_t& t, int day_delta) {
|
||||
struct tm ltm;
|
||||
localtime_r(&t, <m);
|
||||
if (day_delta > 0) {
|
||||
do {
|
||||
++ltm.tm_mday;
|
||||
--day_delta;
|
||||
} while(day_delta > 0);
|
||||
mktime(<m);
|
||||
} else if (day_delta < 0) {
|
||||
do {
|
||||
--ltm.tm_mday;
|
||||
++day_delta;
|
||||
} while(day_delta < 0);
|
||||
mktime(<m);
|
||||
} else {
|
||||
ltm.tm_mday;
|
||||
}
|
||||
return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday;
|
||||
}
|
||||
|
||||
DateT Meta::GetDateWithDelta(int day_delta) {
|
||||
return GetDate(std::time(nullptr), day_delta);
|
||||
}
|
||||
|
||||
DateT Meta::GetDate() {
|
||||
return GetDate(std::time(nullptr));
|
||||
return GetDate(std::time(nullptr), 0);
|
||||
}
|
||||
|
||||
} // namespace meta
|
||||
|
|
|
@ -4,14 +4,11 @@
|
|||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <cstddef>
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include <ctime>
|
||||
#include <memory>
|
||||
|
||||
#include "MetaTypes.h"
|
||||
#include "Options.h"
|
||||
#include "Status.h"
|
||||
|
||||
|
@ -20,44 +17,7 @@ namespace vecwise {
|
|||
namespace engine {
|
||||
namespace meta {
|
||||
|
||||
typedef int DateT;
|
||||
const DateT EmptyDate = -1;
|
||||
typedef std::vector<DateT> DatesT;
|
||||
|
||||
struct GroupSchema {
|
||||
size_t id;
|
||||
std::string group_id;
|
||||
size_t files_cnt = 0;
|
||||
uint16_t dimension;
|
||||
std::string location = "";
|
||||
}; // GroupSchema
|
||||
|
||||
|
||||
struct GroupFileSchema {
|
||||
typedef enum {
|
||||
NEW,
|
||||
RAW,
|
||||
TO_INDEX,
|
||||
INDEX,
|
||||
TO_DELETE,
|
||||
} FILE_TYPE;
|
||||
|
||||
size_t id;
|
||||
std::string group_id;
|
||||
std::string file_id;
|
||||
int file_type = NEW;
|
||||
size_t rows;
|
||||
DateT date = EmptyDate;
|
||||
uint16_t dimension;
|
||||
std::string location = "";
|
||||
long updated_time;
|
||||
}; // GroupFileSchema
|
||||
|
||||
typedef std::vector<GroupFileSchema> GroupFilesSchema;
|
||||
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
|
||||
|
||||
|
||||
class Meta;
|
||||
class Meta {
|
||||
public:
|
||||
typedef std::shared_ptr<Meta> Ptr;
|
||||
|
@ -67,6 +27,8 @@ public:
|
|||
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
|
||||
|
||||
virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
|
||||
virtual Status delete_group_partitions(const std::string& group_id,
|
||||
const meta::DatesT& dates) = 0;
|
||||
|
||||
virtual Status has_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
|
@ -89,6 +51,10 @@ public:
|
|||
virtual Status files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) = 0;
|
||||
|
||||
virtual Status size(long& result) = 0;
|
||||
|
||||
virtual Status archive_files() = 0;
|
||||
|
||||
virtual Status files_to_index(GroupFilesSchema&) = 0;
|
||||
|
||||
virtual Status cleanup() = 0;
|
||||
|
@ -98,8 +64,9 @@ public:
|
|||
|
||||
virtual Status count(const std::string& group_id, long& result) = 0;
|
||||
|
||||
static DateT GetDate(const std::time_t& t);
|
||||
static DateT GetDate(const std::time_t& t, int day_delta = 0);
|
||||
static DateT GetDate();
|
||||
static DateT GetDateWithDelta(int day_delta);
|
||||
|
||||
}; // MetaData
|
||||
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
namespace meta {
|
||||
|
||||
const size_t K = 1024UL;
|
||||
const size_t M = K*K;
|
||||
const size_t G = K*M;
|
||||
const size_t T = K*G;
|
||||
|
||||
const size_t S_PS = 1UL;
|
||||
const size_t MS_PS = 1000*S_PS;
|
||||
const size_t US_PS = 1000*MS_PS;
|
||||
const size_t NS_PS = 1000*US_PS;
|
||||
|
||||
const size_t SECOND = 1UL;
|
||||
const size_t M_SEC = 60*SECOND;
|
||||
const size_t H_SEC = 60*M_SEC;
|
||||
const size_t D_SEC = 24*H_SEC;
|
||||
const size_t W_SEC = 7*D_SEC;
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
|
@ -0,0 +1,57 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
namespace meta {
|
||||
|
||||
typedef int DateT;
|
||||
const DateT EmptyDate = -1;
|
||||
typedef std::vector<DateT> DatesT;
|
||||
|
||||
struct GroupSchema {
|
||||
size_t id;
|
||||
std::string group_id;
|
||||
size_t files_cnt = 0;
|
||||
uint16_t dimension;
|
||||
std::string location = "";
|
||||
long created_on;
|
||||
}; // GroupSchema
|
||||
|
||||
struct GroupFileSchema {
|
||||
typedef enum {
|
||||
NEW,
|
||||
RAW,
|
||||
TO_INDEX,
|
||||
INDEX,
|
||||
TO_DELETE,
|
||||
} FILE_TYPE;
|
||||
|
||||
size_t id;
|
||||
std::string group_id;
|
||||
std::string file_id;
|
||||
int file_type = NEW;
|
||||
size_t size;
|
||||
DateT date = EmptyDate;
|
||||
uint16_t dimension;
|
||||
std::string location = "";
|
||||
long updated_time;
|
||||
long created_on;
|
||||
}; // GroupFileSchema
|
||||
|
||||
typedef std::vector<GroupFileSchema> GroupFilesSchema;
|
||||
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
|
@ -3,9 +3,15 @@
|
|||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include <stdlib.h>
|
||||
#include <assert.h>
|
||||
#include <easylogging++.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include "Options.h"
|
||||
#include "Env.h"
|
||||
#include "DBMetaImpl.h"
|
||||
#include "Exception.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
|
@ -15,10 +21,54 @@ Options::Options()
|
|||
: env(Env::Default()) {
|
||||
}
|
||||
|
||||
/* DBMetaOptions::DBMetaOptions(const std::string& dbpath, */
|
||||
/* const std::string& uri) */
|
||||
/* : path(dbpath), backend_uri(uri) { */
|
||||
/* } */
|
||||
ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) {
|
||||
ParseType(type);
|
||||
ParseCritirias(criterias);
|
||||
}
|
||||
|
||||
void ArchiveConf::ParseCritirias(const std::string& criterias) {
|
||||
std::stringstream ss(criterias);
|
||||
std::vector<std::string> tokens;
|
||||
|
||||
boost::algorithm::split(tokens, criterias, boost::is_any_of(";"));
|
||||
|
||||
if (tokens.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& token : tokens) {
|
||||
std::vector<std::string> kv;
|
||||
boost::algorithm::split(kv, token, boost::is_any_of(":"));
|
||||
if (kv.size() != 2) {
|
||||
LOG(WARNING) << "Invalid ArchiveConf Criterias: " << token << " Ignore!";
|
||||
continue;
|
||||
}
|
||||
if (kv[0] != "disk" && kv[0] != "days") {
|
||||
LOG(WARNING) << "Invalid ArchiveConf Criterias: " << token << " Ignore!";
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
auto value = std::stoi(kv[1]);
|
||||
criterias_[kv[0]] = value;
|
||||
}
|
||||
catch (std::out_of_range&){
|
||||
LOG(ERROR) << "Out of range: '" << kv[1] << "'";
|
||||
throw OutOfRangeException();
|
||||
}
|
||||
catch (...){
|
||||
LOG(ERROR) << "Invalid argument: '" << kv[1] << "'";
|
||||
throw InvalidArgumentException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ArchiveConf::ParseType(const std::string& type) {
|
||||
if (type != "delete" && type != "swap") {
|
||||
LOG(ERROR) << "Invalid argument: type='" << type << "'";
|
||||
throw InvalidArgumentException();
|
||||
}
|
||||
type_ = type;
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <map>
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
|
@ -14,10 +15,26 @@ namespace engine {
|
|||
|
||||
class Env;
|
||||
|
||||
struct ArchiveConf {
|
||||
using CriteriaT = std::map<std::string, int>;
|
||||
|
||||
ArchiveConf(const std::string& type, const std::string& criterias = "disk:512");
|
||||
|
||||
const std::string& GetType() const { return type_; }
|
||||
const CriteriaT GetCriterias() const { return criterias_; }
|
||||
|
||||
private:
|
||||
void ParseCritirias(const std::string& type);
|
||||
void ParseType(const std::string& criterias);
|
||||
|
||||
std::string type_;
|
||||
CriteriaT criterias_;
|
||||
};
|
||||
|
||||
struct DBMetaOptions {
|
||||
/* DBMetaOptions(const std::string&, const std::string&); */
|
||||
std::string path;
|
||||
std::string backend_uri;
|
||||
ArchiveConf archive_conf = ArchiveConf("delete");
|
||||
}; // DBMetaOptions
|
||||
|
||||
|
||||
|
@ -25,7 +42,7 @@ struct Options {
|
|||
Options();
|
||||
uint16_t memory_sync_interval = 1;
|
||||
uint16_t merge_trigger_number = 2;
|
||||
size_t index_trigger_size = 1024*1024*256;
|
||||
size_t index_trigger_size = 1024*1024*1024;
|
||||
Env* env;
|
||||
DBMetaOptions meta;
|
||||
}; // Options
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include <chrono>
|
||||
#include "Utils.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
namespace utils {
|
||||
|
||||
long GetMicroSecTimeStamp() {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
now.time_since_epoch()).count();
|
||||
|
||||
return micros;
|
||||
}
|
||||
|
||||
} // namespace utils
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
|
@ -0,0 +1,19 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
namespace utils {
|
||||
|
||||
long GetMicroSecTimeStamp();
|
||||
|
||||
} // namespace utils
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
|
@ -9,10 +9,107 @@
|
|||
|
||||
#include "utils.h"
|
||||
#include "db/DB.h"
|
||||
<<<<<<< HEAD
|
||||
#include "metrics/Metrics.h"
|
||||
=======
|
||||
#include "db/DBImpl.h"
|
||||
#include "db/MetaConsts.h"
|
||||
>>>>>>> main/branch-1.2
|
||||
|
||||
using namespace zilliz::vecwise;
|
||||
|
||||
TEST_F(DBTest, CONFIG_TEST) {
|
||||
{
|
||||
ASSERT_ANY_THROW(engine::ArchiveConf conf("wrong"));
|
||||
/* EXPECT_DEATH(engine::ArchiveConf conf("wrong"), ""); */
|
||||
}
|
||||
{
|
||||
engine::ArchiveConf conf("delete");
|
||||
ASSERT_EQ(conf.GetType(), "delete");
|
||||
auto criterias = conf.GetCriterias();
|
||||
ASSERT_TRUE(criterias.size() == 1);
|
||||
ASSERT_TRUE(criterias["disk"] == 512);
|
||||
}
|
||||
{
|
||||
engine::ArchiveConf conf("swap");
|
||||
ASSERT_EQ(conf.GetType(), "swap");
|
||||
auto criterias = conf.GetCriterias();
|
||||
ASSERT_TRUE(criterias.size() == 1);
|
||||
ASSERT_TRUE(criterias["disk"] == 512);
|
||||
}
|
||||
{
|
||||
ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "disk:"));
|
||||
ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "disk:a"));
|
||||
engine::ArchiveConf conf("swap", "disk:1024");
|
||||
auto criterias = conf.GetCriterias();
|
||||
ASSERT_TRUE(criterias.size() == 1);
|
||||
ASSERT_TRUE(criterias["disk"] == 1024);
|
||||
}
|
||||
{
|
||||
ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "days:"));
|
||||
ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "days:a"));
|
||||
engine::ArchiveConf conf("swap", "days:100");
|
||||
auto criterias = conf.GetCriterias();
|
||||
ASSERT_TRUE(criterias.size() == 1);
|
||||
ASSERT_TRUE(criterias["days"] == 100);
|
||||
}
|
||||
{
|
||||
ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "days:"));
|
||||
ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "days:a"));
|
||||
engine::ArchiveConf conf("swap", "days:100;disk:200");
|
||||
auto criterias = conf.GetCriterias();
|
||||
ASSERT_TRUE(criterias.size() == 2);
|
||||
ASSERT_TRUE(criterias["days"] == 100);
|
||||
ASSERT_TRUE(criterias["disk"] == 200);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
|
||||
|
||||
static const std::string group_name = "test_group";
|
||||
static const int group_dim = 256;
|
||||
long size;
|
||||
|
||||
engine::meta::GroupSchema group_info;
|
||||
group_info.dimension = group_dim;
|
||||
group_info.group_id = group_name;
|
||||
engine::Status stat = db_->add_group(group_info);
|
||||
|
||||
engine::meta::GroupSchema group_info_get;
|
||||
group_info_get.group_id = group_name;
|
||||
stat = db_->get_group(group_info_get);
|
||||
ASSERT_STATS(stat);
|
||||
ASSERT_EQ(group_info_get.dimension, group_dim);
|
||||
|
||||
engine::IDNumbers vector_ids;
|
||||
engine::IDNumbers target_ids;
|
||||
|
||||
db_->size(size);
|
||||
int d = 256;
|
||||
int nb = 20;
|
||||
float *xb = new float[d * nb];
|
||||
for(int i = 0; i < nb; i++) {
|
||||
for(int j = 0; j < d; j++) xb[d * i + j] = drand48();
|
||||
xb[d * i] += i / 2000.;
|
||||
}
|
||||
|
||||
int loop = 100000;
|
||||
|
||||
for (auto i=0; i<loop; ++i) {
|
||||
db_->add_vectors(group_name, nb, xb, vector_ids);
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
db_->size(size);
|
||||
LOG(DEBUG) << "size=" << size;
|
||||
ASSERT_TRUE(size < 1 * engine::meta::G);
|
||||
|
||||
delete [] xb;
|
||||
};
|
||||
|
||||
|
||||
TEST_F(DBTest, DB_TEST) {
|
||||
|
||||
|
||||
|
@ -68,7 +165,7 @@ TEST_F(DBTest, DB_TEST) {
|
|||
|
||||
START_TIMER;
|
||||
stat = db_->search(group_name, k, qb, qxb, results);
|
||||
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/(1024*1024) << " M";
|
||||
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/engine::meta::M << " M";
|
||||
STOP_TIMER(ss.str());
|
||||
|
||||
ASSERT_STATS(stat);
|
||||
|
|
|
@ -6,10 +6,14 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <easylogging++.h>
|
||||
#include <stdlib.h>
|
||||
#include <time.h>
|
||||
|
||||
#include "utils.h"
|
||||
#include "db/DBMetaImpl.h"
|
||||
#include "db/Factories.h"
|
||||
#include "db/Utils.h"
|
||||
#include "db/MetaConsts.h"
|
||||
|
||||
using namespace zilliz::vecwise::engine;
|
||||
|
||||
|
@ -59,10 +63,124 @@ TEST_F(MetaTest, GROUP_FILE_TEST) {
|
|||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(group_file.file_type, new_file_type);
|
||||
|
||||
/* group_file.file_type = meta::GroupFileSchema::NEW; */
|
||||
/* status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file); */
|
||||
/* ASSERT_TRUE(status.ok()); */
|
||||
/* ASSERT_EQ(group_file.file_type, new_file_type); */
|
||||
meta::DatesT dates;
|
||||
dates.push_back(meta::Meta::GetDate());
|
||||
status = impl_->delete_group_partitions(group_file.group_id, dates);
|
||||
ASSERT_FALSE(status.ok());
|
||||
|
||||
dates.clear();
|
||||
for (auto i=2; i < 10; ++i) {
|
||||
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
|
||||
}
|
||||
status = impl_->delete_group_partitions(group_file.group_id, dates);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
group_file.date = meta::Meta::GetDateWithDelta(-2);
|
||||
status = impl_->update_group_file(group_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(group_file.date, meta::Meta::GetDateWithDelta(-2));
|
||||
ASSERT_FALSE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
|
||||
|
||||
dates.clear();
|
||||
dates.push_back(group_file.date);
|
||||
status = impl_->delete_group_partitions(group_file.group_id, dates);
|
||||
ASSERT_TRUE(status.ok());
|
||||
status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_TRUE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
|
||||
srand(time(0));
|
||||
DBMetaOptions options;
|
||||
options.path = "/tmp/vecwise_test";
|
||||
int days_num = rand() % 100;
|
||||
std::stringstream ss;
|
||||
ss << "days:" << days_num;
|
||||
options.archive_conf = ArchiveConf("delete", ss.str());
|
||||
|
||||
auto impl = meta::DBMetaImpl(options);
|
||||
auto group_id = "meta_test_group";
|
||||
|
||||
meta::GroupSchema group;
|
||||
group.group_id = group_id;
|
||||
auto status = impl.add_group(group);
|
||||
|
||||
meta::GroupFilesSchema files;
|
||||
meta::GroupFileSchema group_file;
|
||||
group_file.group_id = group.group_id;
|
||||
|
||||
auto cnt = 100;
|
||||
long ts = utils::GetMicroSecTimeStamp();
|
||||
std::vector<int> days;
|
||||
for (auto i=0; i<cnt; ++i) {
|
||||
status = impl.add_group_file(group_file);
|
||||
group_file.file_type = meta::GroupFileSchema::NEW;
|
||||
int day = rand() % (days_num*2);
|
||||
group_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
|
||||
status = impl.update_group_file(group_file);
|
||||
files.push_back(group_file);
|
||||
days.push_back(day);
|
||||
}
|
||||
|
||||
impl.archive_files();
|
||||
int i = 0;
|
||||
|
||||
for (auto file : files) {
|
||||
status = impl.get_group_file(file.group_id, file.file_id, file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
if (days[i] < days_num) {
|
||||
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
|
||||
} else {
|
||||
ASSERT_EQ(file.file_type, meta::GroupFileSchema::TO_DELETE);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
impl.drop_all();
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
|
||||
DBMetaOptions options;
|
||||
options.path = "/tmp/vecwise_test";
|
||||
options.archive_conf = ArchiveConf("delete", "disk:11");
|
||||
|
||||
auto impl = meta::DBMetaImpl(options);
|
||||
auto group_id = "meta_test_group";
|
||||
|
||||
meta::GroupSchema group;
|
||||
group.group_id = group_id;
|
||||
auto status = impl.add_group(group);
|
||||
|
||||
meta::GroupFilesSchema files;
|
||||
meta::GroupFileSchema group_file;
|
||||
group_file.group_id = group.group_id;
|
||||
|
||||
auto cnt = 10;
|
||||
auto each_size = 2UL;
|
||||
for (auto i=0; i<cnt; ++i) {
|
||||
status = impl.add_group_file(group_file);
|
||||
group_file.file_type = meta::GroupFileSchema::NEW;
|
||||
group_file.size = each_size * meta::G;
|
||||
status = impl.update_group_file(group_file);
|
||||
files.push_back(group_file);
|
||||
}
|
||||
|
||||
impl.archive_files();
|
||||
int i = 0;
|
||||
|
||||
for (auto file : files) {
|
||||
status = impl.get_group_file(file.group_id, file.file_id, file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
if (i < 5) {
|
||||
ASSERT_TRUE(file.file_type == meta::GroupFileSchema::TO_DELETE);
|
||||
} else {
|
||||
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
|
||||
}
|
||||
++i;
|
||||
}
|
||||
|
||||
impl.drop_all();
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, GROUP_FILES_TEST) {
|
||||
|
|
|
@ -29,19 +29,30 @@ void DBTest::InitLog() {
|
|||
el::Loggers::reconfigureLogger("default", defaultConf);
|
||||
}
|
||||
|
||||
void DBTest::SetUp() {
|
||||
InitLog();
|
||||
engine::Options DBTest::GetOptions() {
|
||||
auto options = engine::OptionsFactory::Build();
|
||||
options.meta.path = "/tmp/vecwise_test";
|
||||
return options;
|
||||
}
|
||||
|
||||
void DBTest::SetUp() {
|
||||
InitLog();
|
||||
auto options = GetOptions();
|
||||
db_ = engine::DBFactory::Build(options, "Faiss,IDMap");
|
||||
}
|
||||
|
||||
void DBTest::TearDown() {
|
||||
delete db_;
|
||||
auto options = engine::OptionsFactory::Build();
|
||||
boost::filesystem::remove_all("/tmp/vecwise_test");
|
||||
}
|
||||
|
||||
engine::Options DBTest2::GetOptions() {
|
||||
auto options = engine::OptionsFactory::Build();
|
||||
options.meta.path = "/tmp/vecwise_test";
|
||||
options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1");
|
||||
return options;
|
||||
}
|
||||
|
||||
void MetaTest::SetUp() {
|
||||
InitLog();
|
||||
impl_ = engine::DBMetaImplFactory::Build();
|
||||
|
|
|
@ -39,6 +39,12 @@ protected:
|
|||
void InitLog();
|
||||
virtual void SetUp() override;
|
||||
virtual void TearDown() override;
|
||||
virtual zilliz::vecwise::engine::Options GetOptions();
|
||||
};
|
||||
|
||||
class DBTest2 : public DBTest {
|
||||
protected:
|
||||
virtual zilliz::vecwise::engine::Options GetOptions() override;
|
||||
};
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue