mirror of https://github.com/milvus-io/milvus.git
#470 raw files should not be build index
parent
3b7ad649f6
commit
e96c97c8f7
|
@ -42,6 +42,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||||
- \#409 - Add a Fallback pass in optimizer
|
- \#409 - Add a Fallback pass in optimizer
|
||||||
- \#433 - C++ SDK query result is not easy to use
|
- \#433 - C++ SDK query result is not easy to use
|
||||||
- \#449 - Add ShowPartitions example for C++ SDK
|
- \#449 - Add ShowPartitions example for C++ SDK
|
||||||
|
- \#470 - Small raw files should not be build index
|
||||||
|
|
||||||
## Task
|
## Task
|
||||||
|
|
||||||
|
|
|
@ -838,6 +838,25 @@ DBImpl::BackgroundBuildIndex() {
|
||||||
// ENGINE_LOG_TRACE << "Background build index thread exit";
|
// ENGINE_LOG_TRACE << "Background build index thread exit";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status
|
||||||
|
DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
|
||||||
|
meta::TableFilesSchema& files) {
|
||||||
|
files.clear();
|
||||||
|
auto status = meta_ptr_->FilesByType(table_id, file_types, files);
|
||||||
|
|
||||||
|
// only build index for files that row count greater than certain threshold
|
||||||
|
for (auto it = files.begin(); it != files.end();) {
|
||||||
|
if ((*it).file_type_ == static_cast<int>(meta::TableFileSchema::RAW) &&
|
||||||
|
(*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
|
||||||
|
it = files.erase(it);
|
||||||
|
} else {
|
||||||
|
it++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
|
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
|
||||||
meta::TableFilesSchema& files) {
|
meta::TableFilesSchema& files) {
|
||||||
|
@ -946,18 +965,18 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
// get files to build index
|
// get files to build index
|
||||||
std::vector<std::string> file_ids;
|
meta::TableFilesSchema table_files;
|
||||||
auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
|
auto status = GetFilesToBuildIndex(table_id, file_types, table_files);
|
||||||
int times = 1;
|
int times = 1;
|
||||||
|
|
||||||
while (!file_ids.empty()) {
|
while (!table_files.empty()) {
|
||||||
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
|
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
|
||||||
if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
|
if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
|
||||||
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
|
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
|
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
|
||||||
status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
|
GetFilesToBuildIndex(table_id, file_types, table_files);
|
||||||
times++;
|
times++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -152,6 +152,10 @@ class DBImpl : public DB {
|
||||||
Status
|
Status
|
||||||
MemSerialize();
|
MemSerialize();
|
||||||
|
|
||||||
|
Status
|
||||||
|
GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
|
||||||
|
meta::TableFilesSchema& files);
|
||||||
|
|
||||||
Status
|
Status
|
||||||
GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
|
GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
|
||||||
meta::TableFilesSchema& files);
|
meta::TableFilesSchema& files);
|
||||||
|
|
|
@ -109,8 +109,7 @@ class Meta {
|
||||||
FilesToIndex(TableFilesSchema&) = 0;
|
FilesToIndex(TableFilesSchema&) = 0;
|
||||||
|
|
||||||
virtual Status
|
virtual Status
|
||||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& table_files) = 0;
|
||||||
std::vector<std::string>& file_ids) = 0;
|
|
||||||
|
|
||||||
virtual Status
|
virtual Status
|
||||||
Size(uint64_t& result) = 0;
|
Size(uint64_t& result) = 0;
|
||||||
|
|
|
@ -32,6 +32,8 @@ const size_t H_SEC = 60 * M_SEC;
|
||||||
const size_t D_SEC = 24 * H_SEC;
|
const size_t D_SEC = 24 * H_SEC;
|
||||||
const size_t W_SEC = 7 * D_SEC;
|
const size_t W_SEC = 7 * D_SEC;
|
||||||
|
|
||||||
|
const size_t BUILD_INDEX_THRESHOLD = 1000;
|
||||||
|
|
||||||
} // namespace meta
|
} // namespace meta
|
||||||
} // namespace engine
|
} // namespace engine
|
||||||
} // namespace milvus
|
} // namespace milvus
|
||||||
|
|
|
@ -959,6 +959,7 @@ MySQLMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
|
||||||
updateTableFilesToIndexQuery << "UPDATE " << META_TABLEFILES
|
updateTableFilesToIndexQuery << "UPDATE " << META_TABLEFILES
|
||||||
<< " SET file_type = " << std::to_string(TableFileSchema::TO_INDEX)
|
<< " SET file_type = " << std::to_string(TableFileSchema::TO_INDEX)
|
||||||
<< " WHERE table_id = " << mysqlpp::quote << table_id
|
<< " WHERE table_id = " << mysqlpp::quote << table_id
|
||||||
|
<< " AND row_count >= " << std::to_string(meta::BUILD_INDEX_THRESHOLD)
|
||||||
<< " AND file_type = " << std::to_string(TableFileSchema::RAW) << ";";
|
<< " AND file_type = " << std::to_string(TableFileSchema::RAW) << ";";
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();
|
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();
|
||||||
|
@ -1527,13 +1528,13 @@ MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) {
|
||||||
|
|
||||||
Status
|
Status
|
||||||
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||||
std::vector<std::string>& file_ids) {
|
TableFilesSchema& table_files) {
|
||||||
if (file_types.empty()) {
|
if (file_types.empty()) {
|
||||||
return Status(DB_ERROR, "file types array is empty");
|
return Status(DB_ERROR, "file types array is empty");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
file_ids.clear();
|
table_files.clear();
|
||||||
|
|
||||||
mysqlpp::StoreQueryResult res;
|
mysqlpp::StoreQueryResult res;
|
||||||
{
|
{
|
||||||
|
@ -1553,9 +1554,10 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
||||||
|
|
||||||
mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
|
mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
|
||||||
// since table_id is a unique column we just need to check whether it exists or not
|
// since table_id is a unique column we just need to check whether it exists or not
|
||||||
hasNonIndexFilesQuery << "SELECT file_id, file_type"
|
hasNonIndexFilesQuery
|
||||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
|
<< "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
|
||||||
<< " AND file_type in (" << types << ");";
|
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
|
||||||
|
<< " AND file_type in (" << types << ");";
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
|
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
|
||||||
|
|
||||||
|
@ -1566,9 +1568,18 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
||||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||||
for (auto& resRow : res) {
|
for (auto& resRow : res) {
|
||||||
std::string file_id;
|
TableFileSchema file_schema;
|
||||||
resRow["file_id"].to_string(file_id);
|
file_schema.id_ = resRow["id"];
|
||||||
file_ids.push_back(file_id);
|
file_schema.table_id_ = table_id;
|
||||||
|
file_schema.engine_type_ = resRow["engine_type"];
|
||||||
|
resRow["file_id"].to_string(file_schema.file_id_);
|
||||||
|
file_schema.file_type_ = resRow["file_type"];
|
||||||
|
file_schema.file_size_ = resRow["file_size"];
|
||||||
|
file_schema.row_count_ = resRow["row_count"];
|
||||||
|
file_schema.date_ = resRow["date"];
|
||||||
|
file_schema.created_on_ = resRow["created_on"];
|
||||||
|
|
||||||
|
table_files.emplace_back(file_schema);
|
||||||
|
|
||||||
int32_t file_type = resRow["file_type"];
|
int32_t file_type = resRow["file_type"];
|
||||||
switch (file_type) {
|
switch (file_type) {
|
||||||
|
|
|
@ -108,7 +108,7 @@ class MySQLMetaImpl : public Meta {
|
||||||
|
|
||||||
Status
|
Status
|
||||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||||
std::vector<std::string>& file_ids) override;
|
TableFilesSchema& table_files) override;
|
||||||
|
|
||||||
Status
|
Status
|
||||||
Archive() override;
|
Archive() override;
|
||||||
|
|
|
@ -58,7 +58,7 @@ HandleException(const std::string& desc, const char* what = nullptr) {
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
inline auto
|
inline auto
|
||||||
StoragePrototype(const std::string &path) {
|
StoragePrototype(const std::string& path) {
|
||||||
return make_storage(path,
|
return make_storage(path,
|
||||||
make_table(META_TABLES,
|
make_table(META_TABLES,
|
||||||
make_column("id", &TableSchema::id_, primary_key()),
|
make_column("id", &TableSchema::id_, primary_key()),
|
||||||
|
@ -160,7 +160,7 @@ SqliteMetaImpl::Initialize() {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
|
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -188,20 +188,20 @@ SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||||
try {
|
try {
|
||||||
auto id = ConnectorPtr->insert(table_schema);
|
auto id = ConnectorPtr->insert(table_schema);
|
||||||
table_schema.id_ = id;
|
table_schema.id_ = id;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when create table", e.what());
|
return HandleException("Encounter exception when create table", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
|
ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
|
||||||
|
|
||||||
return utils::CreateTablePath(options_, table_schema.table_id_);
|
return utils::CreateTablePath(options_, table_schema.table_id_);
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when create table", e.what());
|
return HandleException("Encounter exception when create table", e.what());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -218,7 +218,7 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||||
&TableSchema::partition_tag_,
|
&TableSchema::partition_tag_,
|
||||||
&TableSchema::version_),
|
&TableSchema::version_),
|
||||||
where(c(&TableSchema::table_id_) == table_schema.table_id_
|
where(c(&TableSchema::table_id_) == table_schema.table_id_
|
||||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
|
|
||||||
if (groups.size() == 1) {
|
if (groups.size() == 1) {
|
||||||
table_schema.id_ = std::get<0>(groups[0]);
|
table_schema.id_ = std::get<0>(groups[0]);
|
||||||
|
@ -236,7 +236,7 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||||
} else {
|
} else {
|
||||||
return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
|
return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when describe table", e.what());
|
return HandleException("Encounter exception when describe table", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,20 +244,20 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
||||||
has_or_not = false;
|
has_or_not = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
|
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
|
||||||
where(c(&TableSchema::table_id_) == table_id
|
where(c(&TableSchema::table_id_) == table_id
|
||||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
if (tables.size() == 1) {
|
if (tables.size() == 1) {
|
||||||
has_or_not = true;
|
has_or_not = true;
|
||||||
} else {
|
} else {
|
||||||
has_or_not = false;
|
has_or_not = false;
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when lookup table", e.what());
|
return HandleException("Encounter exception when lookup table", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -281,8 +281,8 @@ SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||||
&TableSchema::owner_table_,
|
&TableSchema::owner_table_,
|
||||||
&TableSchema::partition_tag_,
|
&TableSchema::partition_tag_,
|
||||||
&TableSchema::version_),
|
&TableSchema::version_),
|
||||||
where(c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
for (auto &table : selected) {
|
for (auto& table : selected) {
|
||||||
TableSchema schema;
|
TableSchema schema;
|
||||||
schema.id_ = std::get<0>(table);
|
schema.id_ = std::get<0>(table);
|
||||||
schema.table_id_ = std::get<1>(table);
|
schema.table_id_ = std::get<1>(table);
|
||||||
|
@ -299,7 +299,7 @@ SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||||
|
|
||||||
table_schema_array.emplace_back(schema);
|
table_schema_array.emplace_back(schema);
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when lookup all tables", e.what());
|
return HandleException("Encounter exception when lookup all tables", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,7 +307,7 @@ SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::DropTable(const std::string &table_id) {
|
SqliteMetaImpl::DropTable(const std::string& table_id) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -317,13 +317,13 @@ SqliteMetaImpl::DropTable(const std::string &table_id) {
|
||||||
//soft delete table
|
//soft delete table
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableSchema::state_) = (int) TableSchema::TO_DELETE),
|
c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
|
||||||
where(
|
where(
|
||||||
c(&TableSchema::table_id_) == table_id and
|
c(&TableSchema::table_id_) == table_id and
|
||||||
c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
|
ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when delete table", e.what());
|
return HandleException("Encounter exception when delete table", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,7 +331,7 @@ SqliteMetaImpl::DropTable(const std::string &table_id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -341,14 +341,14 @@ SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||||
//soft delete table files
|
//soft delete table files
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
|
||||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||||
where(
|
where(
|
||||||
c(&TableFileSchema::table_id_) == table_id and
|
c(&TableFileSchema::table_id_) == table_id and
|
||||||
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
|
c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
|
ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when delete table files", e.what());
|
return HandleException("Encounter exception when delete table files", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,7 +356,7 @@ SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
|
||||||
if (file_schema.date_ == EmptyDate) {
|
if (file_schema.date_ == EmptyDate) {
|
||||||
file_schema.date_ = utils::GetDate();
|
file_schema.date_ = utils::GetDate();
|
||||||
}
|
}
|
||||||
|
@ -389,7 +389,7 @@ SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
|
ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
|
||||||
return utils::CreateTableFilePath(options_, file_schema);
|
return utils::CreateTableFilePath(options_, file_schema);
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when create table file", e.what());
|
return HandleException("Encounter exception when create table file", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,8 +398,8 @@ SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||||
|
|
||||||
// TODO(myh): Delete single vecotor by id
|
// TODO(myh): Delete single vecotor by id
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::DropDataByDate(const std::string &table_id,
|
SqliteMetaImpl::DropDataByDate(const std::string& table_id,
|
||||||
const DatesT &dates) {
|
const DatesT& dates) {
|
||||||
if (dates.empty()) {
|
if (dates.empty()) {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -440,7 +440,7 @@ SqliteMetaImpl::DropDataByDate(const std::string &table_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Successfully drop data by date, table id = " << table_schema.table_id_;
|
ENGINE_LOG_DEBUG << "Successfully drop data by date, table id = " << table_schema.table_id_;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when drop partition", e.what());
|
return HandleException("Encounter exception when drop partition", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -448,9 +448,9 @@ SqliteMetaImpl::DropDataByDate(const std::string &table_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
SqliteMetaImpl::GetTableFiles(const std::string& table_id,
|
||||||
const std::vector<size_t> &ids,
|
const std::vector<size_t>& ids,
|
||||||
TableFilesSchema &table_files) {
|
TableFilesSchema& table_files) {
|
||||||
try {
|
try {
|
||||||
table_files.clear();
|
table_files.clear();
|
||||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||||
|
@ -463,7 +463,7 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
||||||
&TableFileSchema::created_on_),
|
&TableFileSchema::created_on_),
|
||||||
where(c(&TableFileSchema::table_id_) == table_id and
|
where(c(&TableFileSchema::table_id_) == table_id and
|
||||||
in(&TableFileSchema::id_, ids) and
|
in(&TableFileSchema::id_, ids) and
|
||||||
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
|
c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
|
||||||
TableSchema table_schema;
|
TableSchema table_schema;
|
||||||
table_schema.table_id_ = table_id;
|
table_schema.table_id_ = table_id;
|
||||||
auto status = DescribeTable(table_schema);
|
auto status = DescribeTable(table_schema);
|
||||||
|
@ -472,7 +472,7 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
Status result;
|
Status result;
|
||||||
for (auto &file : files) {
|
for (auto& file : files) {
|
||||||
TableFileSchema file_schema;
|
TableFileSchema file_schema;
|
||||||
file_schema.table_id_ = table_id;
|
file_schema.table_id_ = table_id;
|
||||||
file_schema.id_ = std::get<0>(file);
|
file_schema.id_ = std::get<0>(file);
|
||||||
|
@ -495,13 +495,13 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Get table files by id";
|
ENGINE_LOG_DEBUG << "Get table files by id";
|
||||||
return result;
|
return result;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when lookup table files", e.what());
|
return HandleException("Encounter exception when lookup table files", e.what());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
|
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -512,7 +512,7 @@ SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
|
||||||
where(
|
where(
|
||||||
c(&TableSchema::table_id_) == table_id));
|
c(&TableSchema::table_id_) == table_id));
|
||||||
ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
|
ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
|
std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
|
||||||
return HandleException(msg, e.what());
|
return HandleException(msg, e.what());
|
||||||
}
|
}
|
||||||
|
@ -521,7 +521,7 @@ SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
|
||||||
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
|
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
@ -534,14 +534,14 @@ SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||||
|
|
||||||
//if the table has been deleted, just mark the table file as TO_DELETE
|
//if the table has been deleted, just mark the table file as TO_DELETE
|
||||||
//clean thread will delete the file later
|
//clean thread will delete the file later
|
||||||
if (tables.size() < 1 || std::get<0>(tables[0]) == (int) TableSchema::TO_DELETE) {
|
if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
|
||||||
file_schema.file_type_ = TableFileSchema::TO_DELETE;
|
file_schema.file_type_ = TableFileSchema::TO_DELETE;
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectorPtr->update(file_schema);
|
ConnectorPtr->update(file_schema);
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
|
ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
|
std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
|
||||||
+ " file_id = " + file_schema.file_id_;
|
+ " file_id = " + file_schema.file_id_;
|
||||||
return HandleException(msg, e.what());
|
return HandleException(msg, e.what());
|
||||||
|
@ -550,7 +550,7 @@ SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -558,13 +558,13 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||||
|
|
||||||
std::map<std::string, bool> has_tables;
|
std::map<std::string, bool> has_tables;
|
||||||
for (auto &file : files) {
|
for (auto& file : files) {
|
||||||
if (has_tables.find(file.table_id_) != has_tables.end()) {
|
if (has_tables.find(file.table_id_) != has_tables.end()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
|
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
|
||||||
where(c(&TableSchema::table_id_) == file.table_id_
|
where(c(&TableSchema::table_id_) == file.table_id_
|
||||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
if (tables.size() >= 1) {
|
if (tables.size() >= 1) {
|
||||||
has_tables[file.table_id_] = true;
|
has_tables[file.table_id_] = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -573,7 +573,7 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||||
}
|
}
|
||||||
|
|
||||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||||
for (auto &file : files) {
|
for (auto& file : files) {
|
||||||
if (!has_tables[file.table_id_]) {
|
if (!has_tables[file.table_id_]) {
|
||||||
file.file_type_ = TableFileSchema::TO_DELETE;
|
file.file_type_ = TableFileSchema::TO_DELETE;
|
||||||
}
|
}
|
||||||
|
@ -589,7 +589,7 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
|
ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when update table files", e.what());
|
return HandleException("Encounter exception when update table files", e.what());
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
@ -613,7 +613,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
|
||||||
&TableSchema::partition_tag_,
|
&TableSchema::partition_tag_,
|
||||||
&TableSchema::version_),
|
&TableSchema::version_),
|
||||||
where(c(&TableSchema::table_id_) == table_id
|
where(c(&TableSchema::table_id_) == table_id
|
||||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
|
|
||||||
if (tables.size() > 0) {
|
if (tables.size() > 0) {
|
||||||
meta::TableSchema table_schema;
|
meta::TableSchema table_schema;
|
||||||
|
@ -639,11 +639,11 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
|
||||||
//set all backup file to raw
|
//set all backup file to raw
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
|
c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
|
||||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||||
where(
|
where(
|
||||||
c(&TableFileSchema::table_id_) == table_id and
|
c(&TableFileSchema::table_id_) == table_id and
|
||||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
|
c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
|
ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
|
||||||
} catch (std::exception& e) {
|
} catch (std::exception& e) {
|
||||||
|
@ -655,7 +655,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
|
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -664,13 +664,14 @@ SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
|
||||||
|
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX),
|
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
|
||||||
where(
|
where(
|
||||||
c(&TableFileSchema::table_id_) == table_id and
|
c(&TableFileSchema::table_id_) == table_id and
|
||||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW));
|
c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
|
||||||
|
c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
|
ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when update table files to to_index", e.what());
|
return HandleException("Encounter exception when update table files to to_index", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -686,7 +687,7 @@ SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& inde
|
||||||
&TableSchema::nlist_,
|
&TableSchema::nlist_,
|
||||||
&TableSchema::metric_type_),
|
&TableSchema::metric_type_),
|
||||||
where(c(&TableSchema::table_id_) == table_id
|
where(c(&TableSchema::table_id_) == table_id
|
||||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
|
|
||||||
if (groups.size() == 1) {
|
if (groups.size() == 1) {
|
||||||
index.engine_type_ = std::get<0>(groups[0]);
|
index.engine_type_ = std::get<0>(groups[0]);
|
||||||
|
@ -713,20 +714,20 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
|
||||||
//soft delete index files
|
//soft delete index files
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
|
||||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||||
where(
|
where(
|
||||||
c(&TableFileSchema::table_id_) == table_id and
|
c(&TableFileSchema::table_id_) == table_id and
|
||||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX));
|
c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));
|
||||||
|
|
||||||
//set all backup file to raw
|
//set all backup file to raw
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
|
c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
|
||||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||||
where(
|
where(
|
||||||
c(&TableFileSchema::table_id_) == table_id and
|
c(&TableFileSchema::table_id_) == table_id and
|
||||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
|
c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
|
||||||
|
|
||||||
//set table index type to raw
|
//set table index type to raw
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
|
@ -738,7 +739,7 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
|
||||||
c(&TableSchema::table_id_) == table_id));
|
c(&TableSchema::table_id_) == table_id));
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
|
ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when delete table index files", e.what());
|
return HandleException("Encounter exception when delete table index files", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -746,7 +747,9 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag) {
|
SqliteMetaImpl::CreatePartition(const std::string& table_id,
|
||||||
|
const std::string& partition_name,
|
||||||
|
const std::string& tag) {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
TableSchema table_schema;
|
TableSchema table_schema;
|
||||||
|
@ -757,7 +760,7 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string&
|
||||||
}
|
}
|
||||||
|
|
||||||
// not allow create partition under partition
|
// not allow create partition under partition
|
||||||
if(!table_schema.owner_table_.empty()) {
|
if (!table_schema.owner_table_.empty()) {
|
||||||
return Status(DB_ERROR, "Nested partition is not allowed");
|
return Status(DB_ERROR, "Nested partition is not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -769,7 +772,7 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string&
|
||||||
// not allow duplicated partition
|
// not allow duplicated partition
|
||||||
std::string exist_partition;
|
std::string exist_partition;
|
||||||
GetPartitionName(table_id, valid_tag, exist_partition);
|
GetPartitionName(table_id, valid_tag, exist_partition);
|
||||||
if(!exist_partition.empty()) {
|
if (!exist_partition.empty()) {
|
||||||
return Status(DB_ERROR, "Duplicate partition is not allowed");
|
return Status(DB_ERROR, "Duplicate partition is not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -805,16 +808,16 @@ SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::Ta
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
auto partitions = ConnectorPtr->select(columns(&TableSchema::table_id_),
|
auto partitions = ConnectorPtr->select(columns(&TableSchema::table_id_),
|
||||||
where(c(&TableSchema::owner_table_) == table_id
|
where(c(&TableSchema::owner_table_) == table_id
|
||||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||||
for(size_t i = 0; i < partitions.size(); i++) {
|
for (size_t i = 0; i < partitions.size(); i++) {
|
||||||
std::string partition_name = std::get<0>(partitions[i]);
|
std::string partition_name = std::get<0>(partitions[i]);
|
||||||
meta::TableSchema partition_schema;
|
meta::TableSchema partition_schema;
|
||||||
partition_schema.table_id_ = partition_name;
|
partition_schema.table_id_ = partition_name;
|
||||||
DescribeTable(partition_schema);
|
DescribeTable(partition_schema);
|
||||||
partiton_schema_array.emplace_back(partition_schema);
|
partiton_schema_array.emplace_back(partition_schema);
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when show partitions", e.what());
|
return HandleException("Encounter exception when show partitions", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -832,14 +835,14 @@ SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
|
||||||
server::StringHelpFunctions::TrimStringBlank(valid_tag);
|
server::StringHelpFunctions::TrimStringBlank(valid_tag);
|
||||||
|
|
||||||
auto name = ConnectorPtr->select(columns(&TableSchema::table_id_),
|
auto name = ConnectorPtr->select(columns(&TableSchema::table_id_),
|
||||||
where(c(&TableSchema::owner_table_) == table_id
|
where(c(&TableSchema::owner_table_) == table_id
|
||||||
and c(&TableSchema::partition_tag_) == valid_tag));
|
and c(&TableSchema::partition_tag_) == valid_tag));
|
||||||
if (name.size() > 0) {
|
if (name.size() > 0) {
|
||||||
partition_name = std::get<0>(name[0]);
|
partition_name = std::get<0>(name[0]);
|
||||||
} else {
|
} else {
|
||||||
return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
|
return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when get partition name", e.what());
|
return HandleException("Encounter exception when get partition name", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1032,7 +1035,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFile
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
|
||||||
files.clear();
|
files.clear();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -1048,13 +1051,13 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||||
&TableFileSchema::engine_type_,
|
&TableFileSchema::engine_type_,
|
||||||
&TableFileSchema::created_on_),
|
&TableFileSchema::created_on_),
|
||||||
where(c(&TableFileSchema::file_type_)
|
where(c(&TableFileSchema::file_type_)
|
||||||
== (int) TableFileSchema::TO_INDEX));
|
== (int)TableFileSchema::TO_INDEX));
|
||||||
|
|
||||||
std::map<std::string, TableSchema> groups;
|
std::map<std::string, TableSchema> groups;
|
||||||
TableFileSchema table_file;
|
TableFileSchema table_file;
|
||||||
|
|
||||||
Status ret;
|
Status ret;
|
||||||
for (auto &file : selected) {
|
for (auto& file : selected) {
|
||||||
table_file.id_ = std::get<0>(file);
|
table_file.id_ = std::get<0>(file);
|
||||||
table_file.table_id_ = std::get<1>(file);
|
table_file.table_id_ = std::get<1>(file);
|
||||||
table_file.file_id_ = std::get<2>(file);
|
table_file.file_id_ = std::get<2>(file);
|
||||||
|
@ -1090,48 +1093,66 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||||
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
|
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when iterate raw files", e.what());
|
return HandleException("Encounter exception when iterate raw files", e.what());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::FilesByType(const std::string &table_id,
|
SqliteMetaImpl::FilesByType(const std::string& table_id,
|
||||||
const std::vector<int> &file_types,
|
const std::vector<int>& file_types,
|
||||||
std::vector<std::string> &file_ids) {
|
TableFilesSchema& table_files) {
|
||||||
if (file_types.empty()) {
|
if (file_types.empty()) {
|
||||||
return Status(DB_ERROR, "file types array is empty");
|
return Status(DB_ERROR, "file types array is empty");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
file_ids.clear();
|
table_files.clear();
|
||||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
|
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||||
&TableFileSchema::file_type_),
|
&TableFileSchema::file_id_,
|
||||||
|
&TableFileSchema::file_type_,
|
||||||
|
&TableFileSchema::file_size_,
|
||||||
|
&TableFileSchema::row_count_,
|
||||||
|
&TableFileSchema::date_,
|
||||||
|
&TableFileSchema::engine_type_,
|
||||||
|
&TableFileSchema::created_on_),
|
||||||
where(in(&TableFileSchema::file_type_, file_types)
|
where(in(&TableFileSchema::file_type_, file_types)
|
||||||
and c(&TableFileSchema::table_id_) == table_id));
|
and c(&TableFileSchema::table_id_) == table_id));
|
||||||
|
|
||||||
if (selected.size() >= 1) {
|
if (selected.size() >= 1) {
|
||||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||||
for (auto &file : selected) {
|
for (auto& file : selected) {
|
||||||
file_ids.push_back(std::get<0>(file));
|
TableFileSchema file_schema;
|
||||||
switch (std::get<1>(file)) {
|
file_schema.table_id_ = table_id;
|
||||||
case (int) TableFileSchema::RAW:raw_count++;
|
file_schema.id_ = std::get<0>(file);
|
||||||
|
file_schema.file_id_ = std::get<1>(file);
|
||||||
|
file_schema.file_type_ = std::get<2>(file);
|
||||||
|
file_schema.file_size_ = std::get<3>(file);
|
||||||
|
file_schema.row_count_ = std::get<4>(file);
|
||||||
|
file_schema.date_ = std::get<5>(file);
|
||||||
|
file_schema.engine_type_ = std::get<6>(file);
|
||||||
|
file_schema.created_on_ = std::get<7>(file);
|
||||||
|
|
||||||
|
switch (file_schema.file_type_) {
|
||||||
|
case (int)TableFileSchema::RAW:raw_count++;
|
||||||
break;
|
break;
|
||||||
case (int) TableFileSchema::NEW:new_count++;
|
case (int)TableFileSchema::NEW:new_count++;
|
||||||
break;
|
break;
|
||||||
case (int) TableFileSchema::NEW_MERGE:new_merge_count++;
|
case (int)TableFileSchema::NEW_MERGE:new_merge_count++;
|
||||||
break;
|
break;
|
||||||
case (int) TableFileSchema::NEW_INDEX:new_index_count++;
|
case (int)TableFileSchema::NEW_INDEX:new_index_count++;
|
||||||
break;
|
break;
|
||||||
case (int) TableFileSchema::TO_INDEX:to_index_count++;
|
case (int)TableFileSchema::TO_INDEX:to_index_count++;
|
||||||
break;
|
break;
|
||||||
case (int) TableFileSchema::INDEX:index_count++;
|
case (int)TableFileSchema::INDEX:index_count++;
|
||||||
break;
|
break;
|
||||||
case (int) TableFileSchema::BACKUP:backup_count++;
|
case (int)TableFileSchema::BACKUP:backup_count++;
|
||||||
break;
|
break;
|
||||||
default:break;
|
default:break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
table_files.emplace_back(file_schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
|
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
|
||||||
|
@ -1139,13 +1160,12 @@ SqliteMetaImpl::FilesByType(const std::string &table_id,
|
||||||
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
|
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
|
||||||
<< " index files:" << index_count << " backup files:" << backup_count;
|
<< " index files:" << index_count << " backup files:" << backup_count;
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when check non index files", e.what());
|
return HandleException("Encounter exception when check non index files", e.what());
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO(myh): Support swap to cloud storage
|
// TODO(myh): Support swap to cloud storage
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::Archive() {
|
SqliteMetaImpl::Archive() {
|
||||||
|
@ -1166,11 +1186,11 @@ SqliteMetaImpl::Archive() {
|
||||||
|
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE),
|
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
|
||||||
where(
|
where(
|
||||||
c(&TableFileSchema::created_on_) < (int64_t) (now - usecs) and
|
c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
|
||||||
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
|
c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when update table files", e.what());
|
return HandleException("Encounter exception when update table files", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1218,15 +1238,15 @@ SqliteMetaImpl::CleanUp() {
|
||||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||||
|
|
||||||
std::vector<int> file_types = {
|
std::vector<int> file_types = {
|
||||||
(int) TableFileSchema::NEW,
|
(int)TableFileSchema::NEW,
|
||||||
(int) TableFileSchema::NEW_INDEX,
|
(int)TableFileSchema::NEW_INDEX,
|
||||||
(int) TableFileSchema::NEW_MERGE
|
(int)TableFileSchema::NEW_MERGE
|
||||||
};
|
};
|
||||||
auto files =
|
auto files =
|
||||||
ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
|
ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
|
||||||
|
|
||||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||||
for (auto &file : files) {
|
for (auto& file : files) {
|
||||||
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
|
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
|
||||||
ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
|
ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
|
||||||
}
|
}
|
||||||
|
@ -1240,7 +1260,7 @@ SqliteMetaImpl::CleanUp() {
|
||||||
if (files.size() > 0) {
|
if (files.size() > 0) {
|
||||||
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
|
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when clean table file", e.what());
|
return HandleException("Encounter exception when clean table file", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1265,7 +1285,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||||
&TableFileSchema::date_),
|
&TableFileSchema::date_),
|
||||||
where(
|
where(
|
||||||
c(&TableFileSchema::file_type_) ==
|
c(&TableFileSchema::file_type_) ==
|
||||||
(int) TableFileSchema::TO_DELETE
|
(int)TableFileSchema::TO_DELETE
|
||||||
and
|
and
|
||||||
c(&TableFileSchema::updated_time_)
|
c(&TableFileSchema::updated_time_)
|
||||||
< now - seconds * US_PS));
|
< now - seconds * US_PS));
|
||||||
|
@ -1354,7 +1374,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status
|
Status
|
||||||
SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
|
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
|
||||||
try {
|
try {
|
||||||
server::MetricCollector metric;
|
server::MetricCollector metric;
|
||||||
|
|
||||||
|
@ -1414,14 +1434,14 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
||||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||||
&TableFileSchema::file_size_),
|
&TableFileSchema::file_size_),
|
||||||
where(c(&TableFileSchema::file_type_)
|
where(c(&TableFileSchema::file_type_)
|
||||||
!= (int) TableFileSchema::TO_DELETE),
|
!= (int)TableFileSchema::TO_DELETE),
|
||||||
order_by(&TableFileSchema::id_),
|
order_by(&TableFileSchema::id_),
|
||||||
limit(10));
|
limit(10));
|
||||||
|
|
||||||
std::vector<int> ids;
|
std::vector<int> ids;
|
||||||
TableFileSchema table_file;
|
TableFileSchema table_file;
|
||||||
|
|
||||||
for (auto &file : selected) {
|
for (auto& file : selected) {
|
||||||
if (to_discard_size <= 0) break;
|
if (to_discard_size <= 0) break;
|
||||||
table_file.id_ = std::get<0>(file);
|
table_file.id_ = std::get<0>(file);
|
||||||
table_file.file_size_ = std::get<1>(file);
|
table_file.file_size_ = std::get<1>(file);
|
||||||
|
@ -1437,7 +1457,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
||||||
|
|
||||||
ConnectorPtr->update_all(
|
ConnectorPtr->update_all(
|
||||||
set(
|
set(
|
||||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
|
||||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||||
where(
|
where(
|
||||||
in(&TableFileSchema::id_, ids)));
|
in(&TableFileSchema::id_, ids)));
|
||||||
|
@ -1448,7 +1468,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
||||||
if (!commited) {
|
if (!commited) {
|
||||||
return HandleException("DiscardFiles error: sqlite transaction failed");
|
return HandleException("DiscardFiles error: sqlite transaction failed");
|
||||||
}
|
}
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception& e) {
|
||||||
return HandleException("Encounter exception when discard table file", e.what());
|
return HandleException("Encounter exception when discard table file", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -108,7 +108,7 @@ class SqliteMetaImpl : public Meta {
|
||||||
|
|
||||||
Status
|
Status
|
||||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||||
std::vector<std::string>& file_ids) override;
|
TableFilesSchema& table_files) override;
|
||||||
|
|
||||||
Status
|
Status
|
||||||
Size(uint64_t& result) override;
|
Size(uint64_t& result) override;
|
||||||
|
|
|
@ -306,9 +306,9 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
||||||
ASSERT_EQ(dated_files[table_file.date_].size(), 0);
|
ASSERT_EQ(dated_files[table_file.date_].size(), 0);
|
||||||
|
|
||||||
std::vector<int> file_types;
|
std::vector<int> file_types;
|
||||||
std::vector<std::string> file_ids;
|
milvus::engine::meta::TableFilesSchema table_files;
|
||||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||||
ASSERT_TRUE(file_ids.empty());
|
ASSERT_TRUE(table_files.empty());
|
||||||
ASSERT_FALSE(status.ok());
|
ASSERT_FALSE(status.ok());
|
||||||
|
|
||||||
file_types = {
|
file_types = {
|
||||||
|
@ -317,11 +317,11 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
||||||
milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW,
|
milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW,
|
||||||
milvus::engine::meta::TableFileSchema::BACKUP,
|
milvus::engine::meta::TableFileSchema::BACKUP,
|
||||||
};
|
};
|
||||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||||
ASSERT_TRUE(status.ok());
|
ASSERT_TRUE(status.ok());
|
||||||
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
|
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
|
||||||
to_index_files_cnt + index_files_cnt;
|
to_index_files_cnt + index_files_cnt;
|
||||||
ASSERT_EQ(file_ids.size(), total_cnt);
|
ASSERT_EQ(table_files.size(), total_cnt);
|
||||||
|
|
||||||
status = impl_->DeleteTableFiles(table_id);
|
status = impl_->DeleteTableFiles(table_id);
|
||||||
ASSERT_TRUE(status.ok());
|
ASSERT_TRUE(status.ok());
|
||||||
|
|
|
@ -169,9 +169,9 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
|
||||||
std::vector<int> file_types = {
|
std::vector<int> file_types = {
|
||||||
(int)milvus::engine::meta::TableFileSchema::NEW,
|
(int)milvus::engine::meta::TableFileSchema::NEW,
|
||||||
};
|
};
|
||||||
std::vector<std::string> file_ids;
|
milvus::engine::meta::TableFilesSchema table_files;
|
||||||
status = impl.FilesByType(table_id, file_types, file_ids);
|
status = impl.FilesByType(table_id, file_types, table_files);
|
||||||
ASSERT_FALSE(file_ids.empty());
|
ASSERT_FALSE(table_files.empty());
|
||||||
|
|
||||||
status = impl.UpdateTableFilesToIndex(table_id);
|
status = impl.UpdateTableFilesToIndex(table_id);
|
||||||
ASSERT_TRUE(status.ok());
|
ASSERT_TRUE(status.ok());
|
||||||
|
@ -326,9 +326,9 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
|
||||||
ASSERT_EQ(dated_files[table_file.date_].size(), 0);
|
ASSERT_EQ(dated_files[table_file.date_].size(), 0);
|
||||||
|
|
||||||
std::vector<int> file_types;
|
std::vector<int> file_types;
|
||||||
std::vector<std::string> file_ids;
|
milvus::engine::meta::TableFilesSchema table_files;
|
||||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||||
ASSERT_TRUE(file_ids.empty());
|
ASSERT_TRUE(table_files.empty());
|
||||||
ASSERT_FALSE(status.ok());
|
ASSERT_FALSE(status.ok());
|
||||||
|
|
||||||
file_types = {
|
file_types = {
|
||||||
|
@ -337,11 +337,11 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
|
||||||
milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW,
|
milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW,
|
||||||
milvus::engine::meta::TableFileSchema::BACKUP,
|
milvus::engine::meta::TableFileSchema::BACKUP,
|
||||||
};
|
};
|
||||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||||
ASSERT_TRUE(status.ok());
|
ASSERT_TRUE(status.ok());
|
||||||
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
|
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
|
||||||
to_index_files_cnt + index_files_cnt;
|
to_index_files_cnt + index_files_cnt;
|
||||||
ASSERT_EQ(file_ids.size(), total_cnt);
|
ASSERT_EQ(table_files.size(), total_cnt);
|
||||||
|
|
||||||
status = impl_->DeleteTableFiles(table_id);
|
status = impl_->DeleteTableFiles(table_id);
|
||||||
ASSERT_TRUE(status.ok());
|
ASSERT_TRUE(status.ok());
|
||||||
|
|
Loading…
Reference in New Issue