mirror of https://github.com/milvus-io/milvus.git
commit
e7d83d9007
|
@ -1058,7 +1058,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
|
|||
|
||||
// step 4: wait and build index
|
||||
status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
|
||||
status = BuildTableIndexRecursively(table_id, index);
|
||||
status = WaitTableIndexRecursively(table_id, index);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
@ -1738,7 +1738,7 @@ DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableInde
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
|
||||
DBImpl::WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index) {
|
||||
// for IDMAP type, only wait all NEW file converted to RAW file
|
||||
// for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
|
||||
std::vector<int> file_types;
|
||||
|
@ -1779,8 +1779,8 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
|
|||
std::vector<meta::TableSchema> partition_array;
|
||||
status = meta_ptr_->ShowPartitions(table_id, partition_array);
|
||||
for (auto& schema : partition_array) {
|
||||
status = BuildTableIndexRecursively(schema.table_id_, index);
|
||||
fiu_do_on("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition",
|
||||
status = WaitTableIndexRecursively(schema.table_id_, index);
|
||||
fiu_do_on("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition",
|
||||
status = Status(DB_ERROR, ""));
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
|
@ -1790,7 +1790,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
|
|||
// failed to build index for some files, return error
|
||||
std::string err_msg;
|
||||
index_failed_checker_.GetErrMsgForTable(table_id, err_msg);
|
||||
fiu_do_on("DBImpl.BuildTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
|
||||
fiu_do_on("DBImpl.WaitTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
|
||||
if (!err_msg.empty()) {
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
|
|
|
@ -216,7 +216,7 @@ class DBImpl : public DB, public server::CacheConfigHandler {
|
|||
UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index);
|
||||
|
||||
Status
|
||||
BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index);
|
||||
WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index);
|
||||
|
||||
Status
|
||||
DropTableIndexRecursively(const std::string& table_id);
|
||||
|
|
|
@ -364,7 +364,7 @@ MemTable::ApplyDeletes() {
|
|||
}
|
||||
}
|
||||
|
||||
status = meta_->UpdateTableFiles(table_files_to_update);
|
||||
status = meta_->UpdateTableFilesRowCount(table_files_to_update);
|
||||
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "Failed to apply deletes: " + status.ToString();
|
||||
|
|
|
@ -87,6 +87,9 @@ class Meta {
|
|||
virtual Status
|
||||
UpdateTableFiles(TableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status
|
||||
UpdateTableFilesRowCount(TableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status
|
||||
UpdateTableIndex(const std::string& table_id, const TableIndex& index) = 0;
|
||||
|
||||
|
|
|
@ -1240,6 +1240,46 @@ MySQLMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::UpdateTableFilesRowCount(TableFilesSchema& files) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
{
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
bool is_null_connection = (connectionPtr == nullptr);
|
||||
if (is_null_connection) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
mysqlpp::Query updateTableFilesQuery = connectionPtr->query();
|
||||
|
||||
for (auto& file : files) {
|
||||
std::string row_count = std::to_string(file.row_count_);
|
||||
std::string updated_time = std::to_string(utils::GetMicroSecTimeStamp());
|
||||
|
||||
updateTableFilesQuery << "UPDATE " << META_TABLEFILES << " SET row_count = " << row_count
|
||||
<< " , updated_time = " << updated_time << " WHERE file_id = " << file.file_id_
|
||||
<< ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesRowCount: " << updateTableFilesQuery.str();
|
||||
|
||||
if (!updateTableFilesQuery.exec()) {
|
||||
return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error());
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Update file " << file.file_id_ << " row count to " << file.row_count_;
|
||||
}
|
||||
} // Scoped Connection
|
||||
|
||||
ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES ROW COUNT", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) {
|
||||
try {
|
||||
|
|
|
@ -82,6 +82,9 @@ class MySQLMetaImpl : public Meta {
|
|||
Status
|
||||
UpdateTableFiles(TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
UpdateTableFilesRowCount(TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
DescribeTableIndex(const std::string& table_id, TableIndex& index) override;
|
||||
|
||||
|
|
|
@ -685,6 +685,26 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::UpdateTableFilesRowCount(TableFilesSchema& files) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
for (auto& file : files) {
|
||||
ConnectorPtr->update_all(set(c(&TableFileSchema::row_count_) = file.row_count_,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||
where(c(&TableFileSchema::file_id_) == file.file_id_));
|
||||
ENGINE_LOG_DEBUG << "Update file " << file.file_id_ << " row count to " << file.row_count_;
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when update table files row count", e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) {
|
||||
try {
|
||||
|
|
|
@ -81,6 +81,9 @@ class SqliteMetaImpl : public Meta {
|
|||
Status
|
||||
UpdateTableFiles(TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
UpdateTableFilesRowCount(TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
DescribeTableIndex(const std::string& table_id, TableIndex& index) override;
|
||||
|
||||
|
|
|
@ -836,15 +836,15 @@ TEST_F(DBTest, PARTITION_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
fiu_init(0);
|
||||
FIU_ENABLE_FIU("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition");
|
||||
FIU_ENABLE_FIU("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition");
|
||||
stat = db_->CreateIndex(table_info.table_id_, index);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition");
|
||||
fiu_disable("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition");
|
||||
|
||||
FIU_ENABLE_FIU("DBImpl.BuildTableIndexRecursively.not_empty_err_msg");
|
||||
FIU_ENABLE_FIU("DBImpl.WaitTableIndexRecursively.not_empty_err_msg");
|
||||
stat = db_->CreateIndex(table_info.table_id_, index);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("DBImpl.BuildTableIndexRecursively.not_empty_err_msg");
|
||||
fiu_disable("DBImpl.WaitTableIndexRecursively.not_empty_err_msg");
|
||||
|
||||
uint64_t row_count = 0;
|
||||
stat = db_->GetTableRowCount(TABLE_NAME, row_count);
|
||||
|
|
|
@ -383,6 +383,47 @@ TEST_F(MetaTest, TABLE_FILE_TEST) {
|
|||
ASSERT_EQ(table_file.file_type_, new_file_type);
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, TABLE_FILE_ROW_COUNT_TEST) {
|
||||
auto table_id = "row_count_test_table";
|
||||
|
||||
milvus::engine::meta::TableSchema table;
|
||||
table.table_id_ = table_id;
|
||||
table.dimension_ = 256;
|
||||
auto status = impl_->CreateTable(table);
|
||||
|
||||
milvus::engine::meta::TableFileSchema table_file;
|
||||
table_file.row_count_ = 100;
|
||||
table_file.table_id_ = table.table_id_;
|
||||
table_file.file_type_ = 1;
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
|
||||
uint64_t cnt = 0;
|
||||
status = impl_->Count(table_id, cnt);
|
||||
ASSERT_EQ(table_file.row_count_, cnt);
|
||||
|
||||
table_file.row_count_ = 99999;
|
||||
milvus::engine::meta::TableFilesSchema table_files = {table_file};
|
||||
status = impl_->UpdateTableFilesRowCount(table_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
cnt = 0;
|
||||
status = impl_->Count(table_id, cnt);
|
||||
ASSERT_EQ(table_file.row_count_, cnt);
|
||||
|
||||
std::vector<size_t> ids = {table_file.id_};
|
||||
milvus::engine::meta::TableFilesSchema schemas;
|
||||
status = impl_->GetTableFiles(table_id, ids, schemas);
|
||||
ASSERT_EQ(schemas.size(), 1UL);
|
||||
ASSERT_EQ(table_file.row_count_, schemas[0].row_count_);
|
||||
ASSERT_EQ(table_file.file_id_, schemas[0].file_id_);
|
||||
ASSERT_EQ(table_file.file_type_, schemas[0].file_type_);
|
||||
ASSERT_EQ(table_file.segment_id_, schemas[0].segment_id_);
|
||||
ASSERT_EQ(table_file.table_id_, schemas[0].table_id_);
|
||||
ASSERT_EQ(table_file.engine_type_, schemas[0].engine_type_);
|
||||
ASSERT_EQ(table_file.dimension_, schemas[0].dimension_);
|
||||
ASSERT_EQ(table_file.flush_lsn_, schemas[0].flush_lsn_);
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
|
||||
srand(time(0));
|
||||
milvus::engine::DBMetaOptions options;
|
||||
|
|
|
@ -271,6 +271,47 @@ TEST_F(MySqlMetaTest, TABLE_FILE_TEST) {
|
|||
ASSERT_TRUE(status.ok());
|
||||
}
|
||||
|
||||
TEST_F(MySqlMetaTest, TABLE_FILE_ROW_COUNT_TEST) {
|
||||
auto table_id = "row_count_test_table";
|
||||
|
||||
milvus::engine::meta::TableSchema table;
|
||||
table.table_id_ = table_id;
|
||||
table.dimension_ = 256;
|
||||
auto status = impl_->CreateTable(table);
|
||||
|
||||
milvus::engine::meta::TableFileSchema table_file;
|
||||
table_file.row_count_ = 100;
|
||||
table_file.table_id_ = table.table_id_;
|
||||
table_file.file_type_ = 1;
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
|
||||
uint64_t cnt = 0;
|
||||
status = impl_->Count(table_id, cnt);
|
||||
ASSERT_EQ(table_file.row_count_, cnt);
|
||||
|
||||
table_file.row_count_ = 99999;
|
||||
milvus::engine::meta::TableFilesSchema table_files = {table_file};
|
||||
status = impl_->UpdateTableFilesRowCount(table_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
cnt = 0;
|
||||
status = impl_->Count(table_id, cnt);
|
||||
ASSERT_EQ(table_file.row_count_, cnt);
|
||||
|
||||
std::vector<size_t> ids = {table_file.id_};
|
||||
milvus::engine::meta::TableFilesSchema schemas;
|
||||
status = impl_->GetTableFiles(table_id, ids, schemas);
|
||||
ASSERT_EQ(schemas.size(), 1UL);
|
||||
ASSERT_EQ(table_file.row_count_, schemas[0].row_count_);
|
||||
ASSERT_EQ(table_file.file_id_, schemas[0].file_id_);
|
||||
ASSERT_EQ(table_file.file_type_, schemas[0].file_type_);
|
||||
ASSERT_EQ(table_file.segment_id_, schemas[0].segment_id_);
|
||||
ASSERT_EQ(table_file.table_id_, schemas[0].table_id_);
|
||||
ASSERT_EQ(table_file.engine_type_, schemas[0].engine_type_);
|
||||
ASSERT_EQ(table_file.dimension_, schemas[0].dimension_);
|
||||
ASSERT_EQ(table_file.flush_lsn_, schemas[0].flush_lsn_);
|
||||
}
|
||||
|
||||
TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
|
||||
fiu_init(0);
|
||||
|
||||
|
|
Loading…
Reference in New Issue