mirror of https://github.com/milvus-io/milvus.git
Merge pull request #555 from yhmo/clean_cache
#530 BuildIndex stop when do build index and search simultaneouslypull/565/head
commit
48bd34b00b
|
@ -24,6 +24,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- \#486 - gpu no usage during index building
|
||||
- \#509 - IVF_PQ index build trapped into dead loop caused by invalid params
|
||||
- \#513 - Unittest DELETE_BY_RANGE sometimes failed
|
||||
- \#523 - Erase file data from cache once the file is marked as deleted
|
||||
- \#527 - faiss benchmark not compatible with faiss 1.6.0
|
||||
- \#530 - BuildIndex stop when do build index and search simultaneously
|
||||
- \#532 - assigin value to `table_name` from confest shell
|
||||
|
|
|
@ -99,8 +99,8 @@ Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
|
|||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
lru_.put(key, item);
|
||||
SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->Size() << " bytes into cache, usage: " << usage_
|
||||
<< " bytes";
|
||||
SERVER_LOG_DEBUG << "Insert " << key << " size: " << item->Size() << " bytes into cache, usage: " << usage_
|
||||
<< " bytes," << " capacity: " << capacity_ << " bytes";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,7 +115,8 @@ Cache<ItemObj>::erase(const std::string& key) {
|
|||
const ItemObj& old_item = lru_.get(key);
|
||||
usage_ -= old_item->Size();
|
||||
|
||||
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size();
|
||||
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size() << " bytes from cache, usage: " << usage_
|
||||
<< " bytes," << " capacity: " << capacity_ << " bytes";
|
||||
|
||||
lru_.erase(key);
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ DBImpl::Stop() {
|
|||
bg_timer_thread_.join();
|
||||
|
||||
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
|
||||
meta_ptr_->CleanUp();
|
||||
meta_ptr_->CleanUpShadowFiles();
|
||||
}
|
||||
|
||||
// ENGINE_LOG_TRACE << "DB service stop";
|
||||
|
@ -777,11 +777,18 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
|
|||
|
||||
meta_ptr_->Archive();
|
||||
|
||||
int ttl = 5 * meta::M_SEC; // default: file will be deleted after 5 minutes
|
||||
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
|
||||
ttl = meta::D_SEC;
|
||||
{
|
||||
uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds
|
||||
meta_ptr_->CleanUpCacheWithTTL(ttl);
|
||||
}
|
||||
|
||||
{
|
||||
uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes
|
||||
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
|
||||
ttl = meta::D_SEC;
|
||||
}
|
||||
meta_ptr_->CleanUpFilesWithTTL(ttl);
|
||||
}
|
||||
meta_ptr_->CleanUpFilesWithTTL(ttl);
|
||||
|
||||
// ENGINE_LOG_TRACE << " Background compaction thread exit";
|
||||
}
|
||||
|
|
|
@ -257,6 +257,11 @@ ExecutionEngineImpl::PhysicalSize() const {
|
|||
Status
|
||||
ExecutionEngineImpl::Serialize() {
|
||||
auto status = write_index(index_, location_);
|
||||
|
||||
// here we reset index size by file size,
|
||||
// since some index type(such as SQ8) data size become smaller after serialized
|
||||
index_->set_size(PhysicalSize());
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
@ -118,9 +118,13 @@ class Meta {
|
|||
Archive() = 0;
|
||||
|
||||
virtual Status
|
||||
CleanUp() = 0;
|
||||
CleanUpShadowFiles() = 0;
|
||||
|
||||
virtual Status CleanUpFilesWithTTL(uint16_t) = 0;
|
||||
virtual Status
|
||||
CleanUpCacheWithTTL(uint64_t seconds) = 0;
|
||||
|
||||
virtual Status
|
||||
CleanUpFilesWithTTL(uint64_t seconds) = 0;
|
||||
|
||||
virtual Status
|
||||
DropAll() = 0;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "db/IDGenerator.h"
|
||||
#include "db/Utils.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
@ -292,7 +293,7 @@ MySQLMetaImpl::Initialize() {
|
|||
// step 5: create meta tables
|
||||
try {
|
||||
if (mode_ != DBOptions::MODE::CLUSTER_READONLY) {
|
||||
CleanUp();
|
||||
CleanUpShadowFiles();
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -1710,7 +1711,7 @@ MySQLMetaImpl::Size(uint64_t& result) {
|
|||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::CleanUp() {
|
||||
MySQLMetaImpl::CleanUpShadowFiles() {
|
||||
try {
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
|
@ -1752,7 +1753,49 @@ MySQLMetaImpl::CleanUp() {
|
|||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
|
||||
// erase deleted/backup files from cache
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
|
||||
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
|
||||
<< std::to_string(TableFileSchema::TO_DELETE) << ","
|
||||
<< std::to_string(TableFileSchema::BACKUP) << ")"
|
||||
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
|
||||
|
||||
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
|
||||
|
||||
TableFileSchema table_file;
|
||||
std::vector<std::string> idsToDelete;
|
||||
|
||||
for (auto& resRow : res) {
|
||||
table_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(table_file.table_id_);
|
||||
resRow["file_id"].to_string(table_file.file_id_);
|
||||
table_file.date_ = resRow["date"];
|
||||
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
server::CommonUtil::EraseFromCache(table_file.location_);
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
std::set<std::string> table_ids;
|
||||
|
||||
|
|
|
@ -117,10 +117,13 @@ class MySQLMetaImpl : public Meta {
|
|||
Size(uint64_t& result) override;
|
||||
|
||||
Status
|
||||
CleanUp() override;
|
||||
CleanUpShadowFiles() override;
|
||||
|
||||
Status
|
||||
CleanUpFilesWithTTL(uint16_t seconds) override;
|
||||
CleanUpCacheWithTTL(uint64_t seconds) override;
|
||||
|
||||
Status
|
||||
CleanUpFilesWithTTL(uint64_t seconds) override;
|
||||
|
||||
Status
|
||||
DropAll() override;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "db/IDGenerator.h"
|
||||
#include "db/Utils.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
@ -154,7 +155,7 @@ SqliteMetaImpl::Initialize() {
|
|||
ConnectorPtr->open_forever(); // thread safe option
|
||||
ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
|
||||
|
||||
CleanUp();
|
||||
CleanUpShadowFiles();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -1231,7 +1232,7 @@ SqliteMetaImpl::Size(uint64_t& result) {
|
|||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CleanUp() {
|
||||
SqliteMetaImpl::CleanUpShadowFiles() {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
|
@ -1269,7 +1270,51 @@ SqliteMetaImpl::CleanUp() {
|
|||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
|
||||
// erase deleted/backup files from cache
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
std::vector<int> file_types = {
|
||||
(int)TableFileSchema::TO_DELETE,
|
||||
(int)TableFileSchema::BACKUP,
|
||||
};
|
||||
|
||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::date_),
|
||||
where(
|
||||
in(&TableFileSchema::file_type_, file_types)
|
||||
and
|
||||
c(&TableFileSchema::updated_time_)
|
||||
< now - seconds * US_PS));
|
||||
|
||||
for (auto& file : files) {
|
||||
TableFileSchema table_file;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.date_ = std::get<3>(file);
|
||||
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
server::CommonUtil::EraseFromCache(table_file.location_);
|
||||
}
|
||||
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when clean cache", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
std::set<std::string> table_ids;
|
||||
|
||||
|
|
|
@ -117,10 +117,13 @@ class SqliteMetaImpl : public Meta {
|
|||
Archive() override;
|
||||
|
||||
Status
|
||||
CleanUp() override;
|
||||
CleanUpShadowFiles() override;
|
||||
|
||||
Status
|
||||
CleanUpFilesWithTTL(uint16_t seconds) override;
|
||||
CleanUpCacheWithTTL(uint64_t seconds) override;
|
||||
|
||||
Status
|
||||
CleanUpFilesWithTTL(uint64_t seconds) override;
|
||||
|
||||
Status
|
||||
DropAll() override;
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
// under the License.
|
||||
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "cache/GpuCacheMgr.h"
|
||||
#include "server/Config.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
#include <dirent.h>
|
||||
|
@ -27,6 +30,7 @@
|
|||
#include <unistd.h>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "boost/filesystem.hpp"
|
||||
|
||||
|
@ -222,5 +226,24 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) {
|
|||
time_integer = mktime(&time_struct);
|
||||
}
|
||||
|
||||
void
|
||||
CommonUtil::EraseFromCache(const std::string& item_key) {
|
||||
if (item_key.empty()) {
|
||||
SERVER_LOG_ERROR << "Empty key cannot be erased from cache";
|
||||
return;
|
||||
}
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->EraseItem(item_key);
|
||||
|
||||
#ifdef MILVUS_GPU_VERSION
|
||||
server::Config& config = server::Config::GetInstance();
|
||||
std::vector<int64_t> gpus;
|
||||
Status s = config.GetGpuResourceConfigSearchResources(gpus);
|
||||
for (auto& gpu : gpus) {
|
||||
cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace server
|
||||
} // namespace milvus
|
||||
|
|
|
@ -56,6 +56,9 @@ class CommonUtil {
|
|||
ConvertTime(time_t time_integer, tm& time_struct);
|
||||
static void
|
||||
ConvertTime(tm time_struct, time_t& time_integer);
|
||||
|
||||
static void
|
||||
EraseFromCache(const std::string& item_key);
|
||||
};
|
||||
|
||||
} // namespace server
|
||||
|
|
|
@ -329,7 +329,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
|||
status = impl_->CreateTableFile(table_file);
|
||||
table_file.file_type_ = milvus::engine::meta::TableFileSchema::NEW;
|
||||
status = impl_->UpdateTableFile(table_file);
|
||||
status = impl_->CleanUp();
|
||||
status = impl_->CleanUpShadowFiles();
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
status = impl_->DropTable(table_id);
|
||||
|
|
Loading…
Reference in New Issue