mirror of https://github.com/milvus-io/milvus.git
feat(db): update for merege and index
Former-commit-id: b06f36ce5a8ebca0f1f4554437b19233f8f6f723pull/191/head
parent
08a1527b66
commit
a6e92dc9ab
|
@ -124,8 +124,15 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
|
|||
updated.push_back(file_schema);
|
||||
}
|
||||
|
||||
auto index_size = group_file.dimension * index->ntotal;
|
||||
faiss::write_index(index.get(), group_file.location.c_str());
|
||||
group_file.file_type = meta::GroupFileSchema::RAW;
|
||||
|
||||
std::cout << "Merged size=" << index_size << std::endl;
|
||||
if (index_size >= _options.index_trigger_size) {
|
||||
group_file.file_type = meta::GroupFileSchema::TO_INDEX;
|
||||
} else {
|
||||
group_file.file_type = meta::GroupFileSchema::RAW;
|
||||
}
|
||||
updated.push_back(group_file);
|
||||
status = _pMeta->update_files(updated);
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ inline auto StoragePrototype(const std::string& path) {
|
|||
|
||||
}
|
||||
|
||||
using ConnectorT = decltype(StoragePrototype(""));
|
||||
using ConnectorT = decltype(StoragePrototype("/tmp/dummy.sqlite3"));
|
||||
static std::unique_ptr<ConnectorT> ConnectorPtr;
|
||||
|
||||
long GetFileSize(const std::string& filename)
|
||||
|
@ -77,6 +77,8 @@ Status DBMetaImpl::initialize() {
|
|||
|
||||
ConnectorPtr->sync_schema();
|
||||
|
||||
cleanup();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -211,6 +213,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
|
|||
group_file.file_type = std::get<3>(file);
|
||||
group_file.rows = std::get<4>(file);
|
||||
group_file.date = std::get<5>(file);
|
||||
GetGroupFilePath(group_file);
|
||||
auto groupItr = groups.find(group_file.group_id);
|
||||
if (groupItr == groups.end()) {
|
||||
GroupSchema group_info;
|
||||
|
@ -316,6 +319,42 @@ Status DBMetaImpl::update_files(const GroupFilesSchema& files) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::cleanup() {
|
||||
std::unique_lock<std::mutex> lk(mutex_);
|
||||
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
|
||||
&GroupFileSchema::group_id,
|
||||
&GroupFileSchema::file_id,
|
||||
&GroupFileSchema::file_type,
|
||||
&GroupFileSchema::rows,
|
||||
&GroupFileSchema::date),
|
||||
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or
|
||||
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW));
|
||||
|
||||
GroupFilesSchema updated;
|
||||
|
||||
for (auto& file : selected) {
|
||||
GroupFileSchema group_file;
|
||||
group_file.id = std::get<0>(file);
|
||||
group_file.group_id = std::get<1>(file);
|
||||
group_file.file_id = std::get<2>(file);
|
||||
group_file.file_type = std::get<3>(file);
|
||||
group_file.rows = std::get<4>(file);
|
||||
group_file.date = std::get<5>(file);
|
||||
GetGroupFilePath(group_file);
|
||||
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
|
||||
boost::filesystem::remove(group_file.location);
|
||||
}
|
||||
ConnectorPtr->remove<GroupFileSchema>(group_file.id);
|
||||
std::cout << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
DBMetaImpl::~DBMetaImpl() {
|
||||
cleanup();
|
||||
}
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
|
|
|
@ -42,6 +42,10 @@ public:
|
|||
|
||||
virtual Status files_to_index(GroupFilesSchema&) override;
|
||||
|
||||
virtual Status cleanup() override;
|
||||
|
||||
virtual ~DBMetaImpl();
|
||||
|
||||
private:
|
||||
|
||||
Status get_group_no_lock(GroupSchema& group_info);
|
||||
|
|
|
@ -236,6 +236,11 @@ Status LocalMetaImpl::update_files(const GroupFilesSchema& files) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::cleanup() {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
|
|
|
@ -34,6 +34,8 @@ public:
|
|||
|
||||
virtual Status update_files(const GroupFilesSchema& files) override;
|
||||
|
||||
virtual Status cleanup() override;
|
||||
|
||||
virtual Status files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) override;
|
||||
|
||||
|
|
|
@ -75,6 +75,8 @@ public:
|
|||
|
||||
virtual Status files_to_index(GroupFilesSchema&) = 0;
|
||||
|
||||
virtual Status cleanup() = 0;
|
||||
|
||||
static DateT GetDate(const std::time_t& t);
|
||||
static DateT GetDate();
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ struct Options {
|
|||
Options();
|
||||
uint16_t memory_sync_interval = 10;
|
||||
uint16_t merge_trigger_number = 100;
|
||||
size_t index_trigger_size = 1024*1024*1024;
|
||||
size_t index_trigger_size = 1024*1024*256;
|
||||
Env* env;
|
||||
DBMetaOptions meta;
|
||||
}; // Options
|
||||
|
|
Loading…
Reference in New Issue