From e96c97c8f70d11f430833b8f88dedab58a28c48d Mon Sep 17 00:00:00 2001 From: groot Date: Fri, 22 Nov 2019 11:28:31 +0800 Subject: [PATCH] #470 raw files should not be build index --- CHANGELOG.md | 1 + core/src/db/DBImpl.cpp | 27 +++- core/src/db/DBImpl.h | 4 + core/src/db/meta/Meta.h | 3 +- core/src/db/meta/MetaConsts.h | 2 + core/src/db/meta/MySQLMetaImpl.cpp | 27 +++- core/src/db/meta/MySQLMetaImpl.h | 2 +- core/src/db/meta/SqliteMetaImpl.cpp | 226 +++++++++++++++------------ core/src/db/meta/SqliteMetaImpl.h | 2 +- core/unittest/db/test_meta.cpp | 10 +- core/unittest/db/test_meta_mysql.cpp | 16 +- 11 files changed, 188 insertions(+), 132 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7c52eb5bb..8830180941 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#409 - Add a Fallback pass in optimizer - \#433 - C++ SDK query result is not easy to use - \#449 - Add ShowPartitions example for C++ SDK +- \#470 - Small raw files should not be build index ## Task diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index dd230ce0d1..51ea665064 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -838,6 +838,25 @@ DBImpl::BackgroundBuildIndex() { // ENGINE_LOG_TRACE << "Background build index thread exit"; } +Status +DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector& file_types, + meta::TableFilesSchema& files) { + files.clear(); + auto status = meta_ptr_->FilesByType(table_id, file_types, files); + + // only build index for files that row count greater than certain threshold + for (auto it = files.begin(); it != files.end();) { + if ((*it).file_type_ == static_cast(meta::TableFileSchema::RAW) && + (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) { + it = files.erase(it); + } else { + it++; + } + } + + return Status::OK(); +} + Status DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector& file_ids, const meta::DatesT& dates, meta::TableFilesSchema& files) { @@ -946,18 +965,18 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex } // get files to build index - std::vector file_ids; - auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids); + meta::TableFilesSchema table_files; + auto status = GetFilesToBuildIndex(table_id, file_types, table_files); int times = 1; - while (!file_ids.empty()) { + while (!table_files.empty()) { ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times; if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) { status = meta_ptr_->UpdateTableFilesToIndex(table_id); } std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100))); - status = meta_ptr_->FilesByType(table_id, file_types, file_ids); + GetFilesToBuildIndex(table_id, file_types, table_files); times++; } diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index a0c5cc356d..bff56efded 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -152,6 +152,10 @@ class DBImpl : public DB { Status MemSerialize(); + Status + GetFilesToBuildIndex(const std::string& table_id, const std::vector& file_types, + meta::TableFilesSchema& files); + Status GetFilesToSearch(const std::string& table_id, const std::vector& file_ids, const meta::DatesT& dates, meta::TableFilesSchema& files); diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index f538bebce6..52fe86fe69 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -109,8 +109,7 @@ class Meta { FilesToIndex(TableFilesSchema&) = 0; virtual Status - FilesByType(const std::string& table_id, const std::vector& file_types, - std::vector& file_ids) = 0; + FilesByType(const std::string& table_id, const std::vector& file_types, TableFilesSchema& table_files) = 0; virtual Status Size(uint64_t& result) = 0; diff --git a/core/src/db/meta/MetaConsts.h b/core/src/db/meta/MetaConsts.h index 4e40ff7731..c21a749fc8 100644 --- a/core/src/db/meta/MetaConsts.h +++ b/core/src/db/meta/MetaConsts.h @@ -32,6 +32,8 @@ const size_t H_SEC = 60 * M_SEC; const size_t D_SEC = 24 * H_SEC; const size_t W_SEC = 7 * D_SEC; +const size_t BUILD_INDEX_THRESHOLD = 1000; + } // namespace meta } // namespace engine } // namespace milvus diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 4406b87f7e..6d13cad248 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -959,6 +959,7 @@ MySQLMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { updateTableFilesToIndexQuery << "UPDATE " << META_TABLEFILES << " SET file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " WHERE table_id = " << mysqlpp::quote << table_id + << " AND row_count >= " << std::to_string(meta::BUILD_INDEX_THRESHOLD) << " AND file_type = " << std::to_string(TableFileSchema::RAW) << ";"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str(); @@ -1527,13 +1528,13 @@ MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) { Status MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector& file_types, - std::vector& file_ids) { + TableFilesSchema& table_files) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); } try { - file_ids.clear(); + table_files.clear(); mysqlpp::StoreQueryResult res; { @@ -1553,9 +1554,10 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector& mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query(); // since table_id is a unique column we just need to check whether it exists or not - hasNonIndexFilesQuery << "SELECT file_id, file_type" - << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id - << " AND file_type in (" << types << ");"; + hasNonIndexFilesQuery + << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on" + << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id + << " AND file_type in (" << types << ");"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str(); @@ -1566,9 +1568,18 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector& int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0; int to_index_count = 0, index_count = 0, backup_count = 0; for (auto& resRow : res) { - std::string file_id; - resRow["file_id"].to_string(file_id); - file_ids.push_back(file_id); + TableFileSchema file_schema; + file_schema.id_ = resRow["id"]; + file_schema.table_id_ = table_id; + file_schema.engine_type_ = resRow["engine_type"]; + resRow["file_id"].to_string(file_schema.file_id_); + file_schema.file_type_ = resRow["file_type"]; + file_schema.file_size_ = resRow["file_size"]; + file_schema.row_count_ = resRow["row_count"]; + file_schema.date_ = resRow["date"]; + file_schema.created_on_ = resRow["created_on"]; + + table_files.emplace_back(file_schema); int32_t file_type = resRow["file_type"]; switch (file_type) { diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index 00b7627548..dd882fca2e 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -108,7 +108,7 @@ class MySQLMetaImpl : public Meta { Status FilesByType(const std::string& table_id, const std::vector& file_types, - std::vector& file_ids) override; + TableFilesSchema& table_files) override; Status Archive() override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 12128c074d..19ec684728 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -58,7 +58,7 @@ HandleException(const std::string& desc, const char* what = nullptr) { } // namespace inline auto -StoragePrototype(const std::string &path) { +StoragePrototype(const std::string& path) { return make_storage(path, make_table(META_TABLES, make_column("id", &TableSchema::id_, primary_key()), @@ -160,7 +160,7 @@ SqliteMetaImpl::Initialize() { } Status -SqliteMetaImpl::CreateTable(TableSchema &table_schema) { +SqliteMetaImpl::CreateTable(TableSchema& table_schema) { try { server::MetricCollector metric; @@ -188,20 +188,20 @@ SqliteMetaImpl::CreateTable(TableSchema &table_schema) { try { auto id = ConnectorPtr->insert(table_schema); table_schema.id_ = id; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when create table", e.what()); } ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_; return utils::CreateTablePath(options_, table_schema.table_id_); - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when create table", e.what()); } } Status -SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { +SqliteMetaImpl::DescribeTable(TableSchema& table_schema) { try { server::MetricCollector metric; @@ -218,7 +218,7 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { &TableSchema::partition_tag_, &TableSchema::version_), where(c(&TableSchema::table_id_) == table_schema.table_id_ - and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); if (groups.size() == 1) { table_schema.id_ = std::get<0>(groups[0]); @@ -236,7 +236,7 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { } else { return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found"); } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when describe table", e.what()); } @@ -244,20 +244,20 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { } Status -SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { +SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) { has_or_not = false; try { server::MetricCollector metric; auto tables = ConnectorPtr->select(columns(&TableSchema::id_), where(c(&TableSchema::table_id_) == table_id - and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); if (tables.size() == 1) { has_or_not = true; } else { has_or_not = false; } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when lookup table", e.what()); } @@ -265,7 +265,7 @@ SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { } Status -SqliteMetaImpl::AllTables(std::vector &table_schema_array) { +SqliteMetaImpl::AllTables(std::vector& table_schema_array) { try { server::MetricCollector metric; @@ -281,8 +281,8 @@ SqliteMetaImpl::AllTables(std::vector &table_schema_array) { &TableSchema::owner_table_, &TableSchema::partition_tag_, &TableSchema::version_), - where(c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); - for (auto &table : selected) { + where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); + for (auto& table : selected) { TableSchema schema; schema.id_ = std::get<0>(table); schema.table_id_ = std::get<1>(table); @@ -299,7 +299,7 @@ SqliteMetaImpl::AllTables(std::vector &table_schema_array) { table_schema_array.emplace_back(schema); } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when lookup all tables", e.what()); } @@ -307,7 +307,7 @@ SqliteMetaImpl::AllTables(std::vector &table_schema_array) { } Status -SqliteMetaImpl::DropTable(const std::string &table_id) { +SqliteMetaImpl::DropTable(const std::string& table_id) { try { server::MetricCollector metric; @@ -317,13 +317,13 @@ SqliteMetaImpl::DropTable(const std::string &table_id) { //soft delete table ConnectorPtr->update_all( set( - c(&TableSchema::state_) = (int) TableSchema::TO_DELETE), + c(&TableSchema::state_) = (int)TableSchema::TO_DELETE), where( c(&TableSchema::table_id_) == table_id and - c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); + c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when delete table", e.what()); } @@ -331,7 +331,7 @@ SqliteMetaImpl::DropTable(const std::string &table_id) { } Status -SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) { +SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) { try { server::MetricCollector metric; @@ -341,14 +341,14 @@ SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) { //soft delete table files ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE, + c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE, c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()), where( c(&TableFileSchema::table_id_) == table_id and - c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE)); + c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE)); ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when delete table files", e.what()); } @@ -356,7 +356,7 @@ SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) { } Status -SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) { +SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) { if (file_schema.date_ == EmptyDate) { file_schema.date_ = utils::GetDate(); } @@ -389,7 +389,7 @@ SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) { ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_; return utils::CreateTableFilePath(options_, file_schema); - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when create table file", e.what()); } @@ -398,8 +398,8 @@ SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) { // TODO(myh): Delete single vecotor by id Status -SqliteMetaImpl::DropDataByDate(const std::string &table_id, - const DatesT &dates) { +SqliteMetaImpl::DropDataByDate(const std::string& table_id, + const DatesT& dates) { if (dates.empty()) { return Status::OK(); } @@ -440,7 +440,7 @@ SqliteMetaImpl::DropDataByDate(const std::string &table_id, } ENGINE_LOG_DEBUG << "Successfully drop data by date, table id = " << table_schema.table_id_; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when drop partition", e.what()); } @@ -448,9 +448,9 @@ SqliteMetaImpl::DropDataByDate(const std::string &table_id, } Status -SqliteMetaImpl::GetTableFiles(const std::string &table_id, - const std::vector &ids, - TableFilesSchema &table_files) { +SqliteMetaImpl::GetTableFiles(const std::string& table_id, + const std::vector& ids, + TableFilesSchema& table_files) { try { table_files.clear(); auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, @@ -463,7 +463,7 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id, &TableFileSchema::created_on_), where(c(&TableFileSchema::table_id_) == table_id and in(&TableFileSchema::id_, ids) and - c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE)); + c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE)); TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); @@ -472,7 +472,7 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id, } Status result; - for (auto &file : files) { + for (auto& file : files) { TableFileSchema file_schema; file_schema.table_id_ = table_id; file_schema.id_ = std::get<0>(file); @@ -495,13 +495,13 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id, ENGINE_LOG_DEBUG << "Get table files by id"; return result; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when lookup table files", e.what()); } } Status -SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { +SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) { try { server::MetricCollector metric; @@ -512,7 +512,7 @@ SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { where( c(&TableSchema::table_id_) == table_id)); ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id; - } catch (std::exception &e) { + } catch (std::exception& e) { std::string msg = "Encounter exception when update table flag: table_id = " + table_id; return HandleException(msg, e.what()); } @@ -521,7 +521,7 @@ SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { } Status -SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { +SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) { file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); try { server::MetricCollector metric; @@ -534,14 +534,14 @@ SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { //if the table has been deleted, just mark the table file as TO_DELETE //clean thread will delete the file later - if (tables.size() < 1 || std::get<0>(tables[0]) == (int) TableSchema::TO_DELETE) { + if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) { file_schema.file_type_ = TableFileSchema::TO_DELETE; } ConnectorPtr->update(file_schema); ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_; - } catch (std::exception &e) { + } catch (std::exception& e) { std::string msg = "Exception update table file: table_id = " + file_schema.table_id_ + " file_id = " + file_schema.file_id_; return HandleException(msg, e.what()); @@ -550,7 +550,7 @@ SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { } Status -SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) { +SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) { try { server::MetricCollector metric; @@ -558,13 +558,13 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) { std::lock_guard meta_lock(meta_mutex_); std::map has_tables; - for (auto &file : files) { + for (auto& file : files) { if (has_tables.find(file.table_id_) != has_tables.end()) { continue; } auto tables = ConnectorPtr->select(columns(&TableSchema::id_), where(c(&TableSchema::table_id_) == file.table_id_ - and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); if (tables.size() >= 1) { has_tables[file.table_id_] = true; } else { @@ -573,7 +573,7 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) { } auto commited = ConnectorPtr->transaction([&]() mutable { - for (auto &file : files) { + for (auto& file : files) { if (!has_tables[file.table_id_]) { file.file_type_ = TableFileSchema::TO_DELETE; } @@ -589,7 +589,7 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) { } ENGINE_LOG_DEBUG << "Update " << files.size() << " table files"; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when update table files", e.what()); } return Status::OK(); @@ -613,7 +613,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& &TableSchema::partition_tag_, &TableSchema::version_), where(c(&TableSchema::table_id_) == table_id - and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); if (tables.size() > 0) { meta::TableSchema table_schema; @@ -639,11 +639,11 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& //set all backup file to raw ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW, + c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW, c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()), where( c(&TableFileSchema::table_id_) == table_id and - c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP)); + c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP)); ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id; } catch (std::exception& e) { @@ -655,7 +655,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& } Status -SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) { +SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { try { server::MetricCollector metric; @@ -664,13 +664,14 @@ SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) { ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX), + c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX), where( c(&TableFileSchema::table_id_) == table_id and - c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW)); + c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and + c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW)); ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when update table files to to_index", e.what()); } @@ -686,7 +687,7 @@ SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& inde &TableSchema::nlist_, &TableSchema::metric_type_), where(c(&TableSchema::table_id_) == table_id - and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); if (groups.size() == 1) { index.engine_type_ = std::get<0>(groups[0]); @@ -713,20 +714,20 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) { //soft delete index files ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE, + c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE, c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()), where( c(&TableFileSchema::table_id_) == table_id and - c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)); + c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX)); //set all backup file to raw ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW, + c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW, c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()), where( c(&TableFileSchema::table_id_) == table_id and - c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP)); + c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP)); //set table index type to raw ConnectorPtr->update_all( @@ -738,7 +739,7 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) { c(&TableSchema::table_id_) == table_id)); ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when delete table index files", e.what()); } @@ -746,7 +747,9 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) { } Status -SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag) { +SqliteMetaImpl::CreatePartition(const std::string& table_id, + const std::string& partition_name, + const std::string& tag) { server::MetricCollector metric; TableSchema table_schema; @@ -757,7 +760,7 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& } // not allow create partition under partition - if(!table_schema.owner_table_.empty()) { + if (!table_schema.owner_table_.empty()) { return Status(DB_ERROR, "Nested partition is not allowed"); } @@ -769,7 +772,7 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& // not allow duplicated partition std::string exist_partition; GetPartitionName(table_id, valid_tag, exist_partition); - if(!exist_partition.empty()) { + if (!exist_partition.empty()) { return Status(DB_ERROR, "Duplicate partition is not allowed"); } @@ -805,16 +808,16 @@ SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vectorselect(columns(&TableSchema::table_id_), - where(c(&TableSchema::owner_table_) == table_id - and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); - for(size_t i = 0; i < partitions.size(); i++) { + where(c(&TableSchema::owner_table_) == table_id + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); + for (size_t i = 0; i < partitions.size(); i++) { std::string partition_name = std::get<0>(partitions[i]); meta::TableSchema partition_schema; partition_schema.table_id_ = partition_name; DescribeTable(partition_schema); partiton_schema_array.emplace_back(partition_schema); } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when show partitions", e.what()); } @@ -832,14 +835,14 @@ SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string& server::StringHelpFunctions::TrimStringBlank(valid_tag); auto name = ConnectorPtr->select(columns(&TableSchema::table_id_), - where(c(&TableSchema::owner_table_) == table_id - and c(&TableSchema::partition_tag_) == valid_tag)); + where(c(&TableSchema::owner_table_) == table_id + and c(&TableSchema::partition_tag_) == valid_tag)); if (name.size() > 0) { partition_name = std::get<0>(name[0]); } else { return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found"); } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when get partition name", e.what()); } @@ -1032,7 +1035,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFile } Status -SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) { +SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) { files.clear(); try { @@ -1048,13 +1051,13 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) { &TableFileSchema::engine_type_, &TableFileSchema::created_on_), where(c(&TableFileSchema::file_type_) - == (int) TableFileSchema::TO_INDEX)); + == (int)TableFileSchema::TO_INDEX)); std::map groups; TableFileSchema table_file; Status ret; - for (auto &file : selected) { + for (auto& file : selected) { table_file.id_ = std::get<0>(file); table_file.table_id_ = std::get<1>(file); table_file.file_id_ = std::get<2>(file); @@ -1090,48 +1093,66 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) { ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files"; } return ret; - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when iterate raw files", e.what()); } } Status -SqliteMetaImpl::FilesByType(const std::string &table_id, - const std::vector &file_types, - std::vector &file_ids) { +SqliteMetaImpl::FilesByType(const std::string& table_id, + const std::vector& file_types, + TableFilesSchema& table_files) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); } try { - file_ids.clear(); - auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_, - &TableFileSchema::file_type_), + table_files.clear(); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, + &TableFileSchema::file_id_, + &TableFileSchema::file_type_, + &TableFileSchema::file_size_, + &TableFileSchema::row_count_, + &TableFileSchema::date_, + &TableFileSchema::engine_type_, + &TableFileSchema::created_on_), where(in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::table_id_) == table_id)); if (selected.size() >= 1) { int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0; int to_index_count = 0, index_count = 0, backup_count = 0; - for (auto &file : selected) { - file_ids.push_back(std::get<0>(file)); - switch (std::get<1>(file)) { - case (int) TableFileSchema::RAW:raw_count++; + for (auto& file : selected) { + TableFileSchema file_schema; + file_schema.table_id_ = table_id; + file_schema.id_ = std::get<0>(file); + file_schema.file_id_ = std::get<1>(file); + file_schema.file_type_ = std::get<2>(file); + file_schema.file_size_ = std::get<3>(file); + file_schema.row_count_ = std::get<4>(file); + file_schema.date_ = std::get<5>(file); + file_schema.engine_type_ = std::get<6>(file); + file_schema.created_on_ = std::get<7>(file); + + switch (file_schema.file_type_) { + case (int)TableFileSchema::RAW:raw_count++; break; - case (int) TableFileSchema::NEW:new_count++; + case (int)TableFileSchema::NEW:new_count++; break; - case (int) TableFileSchema::NEW_MERGE:new_merge_count++; + case (int)TableFileSchema::NEW_MERGE:new_merge_count++; break; - case (int) TableFileSchema::NEW_INDEX:new_index_count++; + case (int)TableFileSchema::NEW_INDEX:new_index_count++; break; - case (int) TableFileSchema::TO_INDEX:to_index_count++; + case (int)TableFileSchema::TO_INDEX:to_index_count++; break; - case (int) TableFileSchema::INDEX:index_count++; + case (int)TableFileSchema::INDEX:index_count++; break; - case (int) TableFileSchema::BACKUP:backup_count++; + case (int)TableFileSchema::BACKUP:backup_count++; break; default:break; } + + table_files.emplace_back(file_schema); } ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count @@ -1139,13 +1160,12 @@ SqliteMetaImpl::FilesByType(const std::string &table_id, << " new_index files:" << new_index_count << " to_index files:" << to_index_count << " index files:" << index_count << " backup files:" << backup_count; } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when check non index files", e.what()); } return Status::OK(); } - // TODO(myh): Support swap to cloud storage Status SqliteMetaImpl::Archive() { @@ -1166,11 +1186,11 @@ SqliteMetaImpl::Archive() { ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE), + c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE), where( - c(&TableFileSchema::created_on_) < (int64_t) (now - usecs) and - c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE)); - } catch (std::exception &e) { + c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and + c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE)); + } catch (std::exception& e) { return HandleException("Encounter exception when update table files", e.what()); } @@ -1218,15 +1238,15 @@ SqliteMetaImpl::CleanUp() { std::lock_guard meta_lock(meta_mutex_); std::vector file_types = { - (int) TableFileSchema::NEW, - (int) TableFileSchema::NEW_INDEX, - (int) TableFileSchema::NEW_MERGE + (int)TableFileSchema::NEW, + (int)TableFileSchema::NEW_INDEX, + (int)TableFileSchema::NEW_MERGE }; auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types))); auto commited = ConnectorPtr->transaction([&]() mutable { - for (auto &file : files) { + for (auto& file : files) { ENGINE_LOG_DEBUG << "Remove table file type as NEW"; ConnectorPtr->remove(std::get<0>(file)); } @@ -1240,7 +1260,7 @@ SqliteMetaImpl::CleanUp() { if (files.size() > 0) { ENGINE_LOG_DEBUG << "Clean " << files.size() << " files"; } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when clean table file", e.what()); } @@ -1265,7 +1285,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { &TableFileSchema::date_), where( c(&TableFileSchema::file_type_) == - (int) TableFileSchema::TO_DELETE + (int)TableFileSchema::TO_DELETE and c(&TableFileSchema::updated_time_) < now - seconds * US_PS)); @@ -1354,7 +1374,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { } Status -SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) { +SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) { try { server::MetricCollector metric; @@ -1414,14 +1434,14 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) { auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::file_size_), where(c(&TableFileSchema::file_type_) - != (int) TableFileSchema::TO_DELETE), + != (int)TableFileSchema::TO_DELETE), order_by(&TableFileSchema::id_), limit(10)); std::vector ids; TableFileSchema table_file; - for (auto &file : selected) { + for (auto& file : selected) { if (to_discard_size <= 0) break; table_file.id_ = std::get<0>(file); table_file.file_size_ = std::get<1>(file); @@ -1437,7 +1457,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) { ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE, + c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE, c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()), where( in(&TableFileSchema::id_, ids))); @@ -1448,7 +1468,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) { if (!commited) { return HandleException("DiscardFiles error: sqlite transaction failed"); } - } catch (std::exception &e) { + } catch (std::exception& e) { return HandleException("Encounter exception when discard table file", e.what()); } diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 84d97ed49d..8e821d81de 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -108,7 +108,7 @@ class SqliteMetaImpl : public Meta { Status FilesByType(const std::string& table_id, const std::vector& file_types, - std::vector& file_ids) override; + TableFilesSchema& table_files) override; Status Size(uint64_t& result) override; diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index 097f004bd1..143bf39383 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -306,9 +306,9 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { ASSERT_EQ(dated_files[table_file.date_].size(), 0); std::vector file_types; - std::vector file_ids; - status = impl_->FilesByType(table.table_id_, file_types, file_ids); - ASSERT_TRUE(file_ids.empty()); + milvus::engine::meta::TableFilesSchema table_files; + status = impl_->FilesByType(table.table_id_, file_types, table_files); + ASSERT_TRUE(table_files.empty()); ASSERT_FALSE(status.ok()); file_types = { @@ -317,11 +317,11 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW, milvus::engine::meta::TableFileSchema::BACKUP, }; - status = impl_->FilesByType(table.table_id_, file_types, file_ids); + status = impl_->FilesByType(table.table_id_, file_types, table_files); ASSERT_TRUE(status.ok()); uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt + to_index_files_cnt + index_files_cnt; - ASSERT_EQ(file_ids.size(), total_cnt); + ASSERT_EQ(table_files.size(), total_cnt); status = impl_->DeleteTableFiles(table_id); ASSERT_TRUE(status.ok()); diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index b9a82c0748..9a52a01b7b 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -169,9 +169,9 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) { std::vector file_types = { (int)milvus::engine::meta::TableFileSchema::NEW, }; - std::vector file_ids; - status = impl.FilesByType(table_id, file_types, file_ids); - ASSERT_FALSE(file_ids.empty()); + milvus::engine::meta::TableFilesSchema table_files; + status = impl.FilesByType(table_id, file_types, table_files); + ASSERT_FALSE(table_files.empty()); status = impl.UpdateTableFilesToIndex(table_id); ASSERT_TRUE(status.ok()); @@ -326,9 +326,9 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) { ASSERT_EQ(dated_files[table_file.date_].size(), 0); std::vector file_types; - std::vector file_ids; - status = impl_->FilesByType(table.table_id_, file_types, file_ids); - ASSERT_TRUE(file_ids.empty()); + milvus::engine::meta::TableFilesSchema table_files; + status = impl_->FilesByType(table.table_id_, file_types, table_files); + ASSERT_TRUE(table_files.empty()); ASSERT_FALSE(status.ok()); file_types = { @@ -337,11 +337,11 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) { milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW, milvus::engine::meta::TableFileSchema::BACKUP, }; - status = impl_->FilesByType(table.table_id_, file_types, file_ids); + status = impl_->FilesByType(table.table_id_, file_types, table_files); ASSERT_TRUE(status.ok()); uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt + to_index_files_cnt + index_files_cnt; - ASSERT_EQ(file_ids.size(), total_cnt); + ASSERT_EQ(table_files.size(), total_cnt); status = impl_->DeleteTableFiles(table_id); ASSERT_TRUE(status.ok());