#2394 Drop collection timeout if too many partitions created on colle… (#2477)

* #2349 Drop collection timeout if too many partitions created on collection

Signed-off-by: groot <yihua.mo@zilliz.com>

* changelog

Signed-off-by: yhmo <yihua.mo@zilliz.com>
pull/2481/head
groot 2020-05-31 22:19:50 -05:00 committed by GitHub
parent fdfb6526e0
commit 1daf00dcf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 159 additions and 113 deletions

View File

@ -5,6 +5,7 @@ Please mark all change in change log and use the issue from GitHub
## Bug
- \#2367 Fix inconsistent reading and writing when using mishards
- \#2394 Drop collection timeout if too many partitions created on collection
## Feature
- \#2363 Update branch version

View File

@ -269,11 +269,37 @@ DBImpl::DropCollection(const std::string& collection_id) {
return SHUTDOWN_ERROR;
}
// dates partly delete files of the collection but currently we don't support
LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
Status status;
if (options_.wal_enable_) {
wal_mgr_->DropCollection(collection_id);
}
return DropCollectionRecursively(collection_id);
status = mem_mgr_->EraseMemVector(collection_id); // not allow insert
status = meta_ptr_->DropCollections({collection_id}); // soft delete collection
index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
std::vector<std::string> partition_id_array;
for (auto& schema : partition_array) {
if (options_.wal_enable_) {
wal_mgr_->DropCollection(schema.collection_id_);
}
status = mem_mgr_->EraseMemVector(schema.collection_id_);
index_failed_checker_.CleanFailedIndexFileOfCollection(schema.collection_id_);
partition_id_array.push_back(schema.collection_id_);
}
status = meta_ptr_->DropCollections(partition_id_array);
fiu_do_on("DBImpl.DropCollection.failed", status = Status(DB_ERROR, ""));
if (!status.ok()) {
return status;
}
return Status::OK();
}
Status
@ -2290,39 +2316,6 @@ DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<
return Status::OK();
}
Status
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
// dates partly delete files of the collection but currently we don't support
LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
Status status;
if (options_.wal_enable_) {
wal_mgr_->DropCollection(collection_id);
}
status = mem_mgr_->EraseMemVector(collection_id); // not allow insert
status = meta_ptr_->DropCollection(collection_id); // soft delete collection
index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
// scheduler will determine when to delete collection files
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitAndDelete();
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
status = DropCollectionRecursively(schema.collection_id_);
fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
if (!status.ok()) {
return status;
}
}
return Status::OK();
}
Status
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
DropIndex(collection_id);

View File

@ -258,9 +258,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
std::set<std::string>& partition_name_array);
Status
DropCollectionRecursively(const std::string& collection_id);
Status
UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index);

View File

@ -71,10 +71,10 @@ class Meta {
GetCollectionFlushLSN(const std::string& collection_id, uint64_t& flush_lsn) = 0;
virtual Status
DropCollection(const std::string& collection_id) = 0;
DropCollections(const std::vector<std::string>& collection_id_array) = 0;
virtual Status
DeleteCollectionFiles(const std::string& collection_id) = 0;
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) = 0;
virtual Status
CreateCollectionFile(SegmentSchema& file_schema) = 0;

View File

@ -46,6 +46,26 @@ namespace meta {
namespace {
constexpr uint64_t SQL_BATCH_SIZE = 50;
template <typename T>
void
DistributeBatch(const T& id_array, std::vector<std::vector<std::string>>& id_groups) {
std::vector<std::string> temp_group;
constexpr uint64_t SQL_BATCH_SIZE = 50;
for (auto& id : id_array) {
temp_group.push_back(id);
if (temp_group.size() >= SQL_BATCH_SIZE) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
}
Status
HandleException(const std::string& desc, const char* what = nullptr) {
if (what == nullptr) {
@ -636,10 +656,15 @@ MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_a
}
Status
MySQLMetaImpl::DropCollection(const std::string& collection_id) {
MySQLMetaImpl::DropCollections(const std::vector<std::string>& collection_id_array) {
try {
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
{
for (auto group : id_groups) {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
@ -650,26 +675,32 @@ MySQLMetaImpl::DropCollection(const std::string& collection_id) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection
mysqlpp::Query statement = connectionPtr->query();
//
statement << "UPDATE " << META_TABLES << " SET state = " << std::to_string(CollectionSchema::TO_DELETE)
<< " WHERE table_id = " << mysqlpp::quote << collection_id << ";";
<< " WHERE table_id in(";
for (size_t i = 0; i < group.size(); i++) {
statement << mysqlpp::quote << group[i];
if (i != group.size() - 1) {
statement << ",";
}
}
statement << ")";
LOG_ENGINE_DEBUG_ << "DeleteCollection: " << statement.str();
LOG_ENGINE_DEBUG_ << "DropCollections: " << statement.str();
if (!statement.exec()) {
return HandleException("Failed to drop collection", statement.error());
return HandleException("Failed to drop collections", statement.error());
}
} // Scoped Connection
bool is_writable_mode{mode_ == DBOptions::MODE::CLUSTER_WRITABLE};
fiu_do_on("MySQLMetaImpl.DropCollection.CLUSTER_WRITABLE_MODE", is_writable_mode = true);
if (is_writable_mode) {
DeleteCollectionFiles(collection_id);
}
LOG_ENGINE_DEBUG_ << "Successfully delete collection: " << collection_id;
auto status = DeleteCollectionFiles(collection_id_array);
LOG_ENGINE_DEBUG_ << "Successfully delete collections";
return status;
} catch (std::exception& e) {
return HandleException("Failed to drop collection", e.what());
}
@ -678,10 +709,14 @@ MySQLMetaImpl::DropCollection(const std::string& collection_id) {
}
Status
MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
MySQLMetaImpl::DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) {
try {
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
{
for (auto group : id_groups) {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
@ -699,9 +734,14 @@ MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
mysqlpp::Query statement = connectionPtr->query();
//
statement << "UPDATE " << META_TABLEFILES << " SET file_type = " << std::to_string(SegmentSchema::TO_DELETE)
<< " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp())
<< " WHERE table_id = " << mysqlpp::quote << collection_id << " AND file_type <> "
<< std::to_string(SegmentSchema::TO_DELETE) << ";";
<< " ,updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " WHERE table_id in (";
for (size_t i = 0; i < group.size(); i++) {
statement << mysqlpp::quote << group[i];
if (i != group.size() - 1) {
statement << ",";
}
}
statement << ") AND file_type <> " << std::to_string(SegmentSchema::TO_DELETE) << ";";
LOG_ENGINE_DEBUG_ << "DeleteCollectionFiles: " << statement.str();
@ -710,7 +750,7 @@ MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
}
} // Scoped Connection
LOG_ENGINE_DEBUG_ << "Successfully delete collection files from " << collection_id;
LOG_ENGINE_DEBUG_ << "Successfully delete collection files";
} catch (std::exception& e) {
return HandleException("Failed to delete colletion files", e.what());
}
@ -1519,7 +1559,7 @@ MySQLMetaImpl::HasPartition(const std::string& collection_id, const std::string&
Status
MySQLMetaImpl::DropPartition(const std::string& partition_name) {
return DropCollection(partition_name);
return DropCollections({partition_name});
}
Status
@ -1717,20 +1757,8 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
}
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
for (auto& id : partition_id_array) {
temp_group.push_back(id);
if (temp_group.size() >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
DistributeBatch(partition_id_array, id_groups);
// perform query batch by batch
int64_t files_count = 0;
@ -2130,14 +2158,13 @@ MySQLMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collecti
server::MetricCollector metric;
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
std::unordered_map<std::string, meta::CollectionSchema> map_collections;
for (auto& collection : collections) {
map_collections.insert(std::make_pair(collection.collection_id_, collection));
temp_group.push_back(collection.collection_id_);
if (temp_group.size() >= batch_size) {
if (temp_group.size() >= SQL_BATCH_SIZE) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}

View File

@ -45,10 +45,10 @@ class MySQLMetaImpl : public Meta {
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
Status
DropCollection(const std::string& collection_id) override;
DropCollections(const std::vector<std::string>& collection_id_array) override;
Status
DeleteCollectionFiles(const std::string& collection_id) override;
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) override;
Status
CreateCollectionFile(SegmentSchema& file_schema) override;

View File

@ -47,6 +47,26 @@ using namespace sqlite_orm;
namespace {
constexpr uint64_t SQL_BATCH_SIZE = 50;
template<typename T>
void
DistributeBatch(const T& id_array, std::vector<std::vector<std::string>>& id_groups) {
std::vector<std::string> temp_group;
constexpr uint64_t SQL_BATCH_SIZE = 50;
for (auto& id : id_array) {
temp_group.push_back(id);
if (temp_group.size() >= SQL_BATCH_SIZE) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
}
Status
HandleException(const std::string& desc, const char* what = nullptr) {
if (what == nullptr) {
@ -367,22 +387,32 @@ SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_
}
Status
SqliteMetaImpl::DropCollection(const std::string& collection_id) {
SqliteMetaImpl::DropCollections(const std::vector<std::string>& collection_id_array) {
try {
fiu_do_on("SqliteMetaImpl.DropCollection.throw_exception", throw std::exception());
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection
ConnectorPtr->update_all(
set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE),
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
for (auto group : id_groups) {
// soft delete collection
ConnectorPtr->update_all(
set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE),
where(in(&CollectionSchema::collection_id_, group)
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
}
}
LOG_ENGINE_DEBUG_ << "Successfully delete collection, collection id = " << collection_id;
auto status = DeleteCollectionFiles(collection_id_array);
LOG_ENGINE_DEBUG_ << "Successfully delete collections";
return status;
} catch (std::exception& e) {
return HandleException("Encounter exception when delete collection", e.what());
}
@ -391,22 +421,28 @@ SqliteMetaImpl::DropCollection(const std::string& collection_id) {
}
Status
SqliteMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
SqliteMetaImpl::DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) {
try {
fiu_do_on("SqliteMetaImpl.DeleteCollectionFiles.throw_exception", throw std::exception());
// distribute id array to batches
std::vector<std::vector<std::string>> id_groups;
DistributeBatch(collection_id_array, id_groups);
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection files
ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE,
c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(c(&SegmentSchema::collection_id_) == collection_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
for (auto group : id_groups) {
// soft delete collection files
ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE,
c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(in(&SegmentSchema::collection_id_, group) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
LOG_ENGINE_DEBUG_ << "Successfully delete collection files, collection id = " << collection_id;
LOG_ENGINE_DEBUG_ << "Successfully delete collection files";
} catch (std::exception& e) {
return HandleException("Encounter exception when delete collection files", e.what());
}
@ -979,7 +1015,7 @@ SqliteMetaImpl::HasPartition(const std::string& collection_id, const std::string
Status
SqliteMetaImpl::DropPartition(const std::string& partition_name) {
return DropCollection(partition_name);
return DropCollections({partition_name});
}
Status

View File

@ -47,10 +47,10 @@ class SqliteMetaImpl : public Meta {
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
Status
DropCollection(const std::string& collection_id) override;
DropCollections(const std::vector<std::string>& collection_id_array) override;
Status
DeleteCollectionFiles(const std::string& collection_id) override;
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) override;
Status
CreateCollectionFile(SegmentSchema& file_schema) override;

View File

@ -27,7 +27,7 @@ void
DeleteJob::WaitAndDelete() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_resource == num_resource_; });
meta_ptr_->DeleteCollectionFiles(collection_id_);
meta_ptr_->DeleteCollectionFiles({collection_id_});
}
void

View File

@ -968,10 +968,6 @@ TEST_F(DBTest2, DELETE_TEST) {
// fail drop collection
fiu_init(0);
FIU_ENABLE_FIU("DBImpl.DropCollectionRecursively.failed");
stat = db_->DropCollection(COLLECTION_NAME);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.DropCollectionRecursively.failed");
stat = db_->DropCollection(COLLECTION_NAME);
ASSERT_TRUE(stat.ok());

View File

@ -45,7 +45,7 @@ TEST_F(MetaTest, COLLECTION_TEST) {
status = impl_->CreateCollection(collection);
ASSERT_EQ(status.code(), milvus::DB_ALREADY_EXIST);
status = impl_->DropCollection(collection.collection_id_);
status = impl_->DropCollections({collection.collection_id_});
ASSERT_TRUE(status.ok());
status = impl_->CreateCollection(collection);
@ -124,7 +124,7 @@ TEST_F(MetaTest, FALID_TEST) {
}
{
FIU_ENABLE_FIU("SqliteMetaImpl.DropCollection.throw_exception");
status = impl_->DropCollection(collection.collection_id_);
status = impl_->DropCollections({collection.collection_id_});
ASSERT_FALSE(status.ok());
fiu_disable("SqliteMetaImpl.DropCollection.throw_exception");
}
@ -142,7 +142,7 @@ TEST_F(MetaTest, FALID_TEST) {
}
{
FIU_ENABLE_FIU("SqliteMetaImpl.DeleteCollectionFiles.throw_exception");
status = impl_->DeleteCollectionFiles(collection.collection_id_);
status = impl_->DeleteCollectionFiles({collection.collection_id_});
ASSERT_FALSE(status.ok());
fiu_disable("SqliteMetaImpl.DeleteCollectionFiles.throw_exception");
}
@ -687,7 +687,7 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
to_index_files_cnt + index_files_cnt;
ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt);
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_TRUE(status.ok());
status = impl_->CreateCollectionFile(table_file);
@ -712,7 +712,7 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());
status = impl_->DropCollection(collection_id);
status = impl_->DropCollections({collection_id});
ASSERT_TRUE(status.ok());
}

View File

@ -103,10 +103,6 @@ TEST_F(MySqlMetaTest, COLLECTION_TEST) {
ASSERT_FALSE(has_collection);
fiu_disable("MySQLMetaImpl.HasCollection.throw_exception");
FIU_ENABLE_FIU("MySQLMetaImpl.DropCollection.CLUSTER_WRITABLE_MODE");
stat = impl_->DropCollection(collection_id);
fiu_disable("MySQLMetaImpl.DropCollection.CLUSTER_WRITABLE_MODE");
FIU_ENABLE_FIU("MySQLMetaImpl.DropAll.null_connection");
status = impl_->DropAll();
ASSERT_FALSE(status.ok());
@ -298,7 +294,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());
status = impl_->DropCollection(table_file.collection_id_);
status = impl_->DropCollections({table_file.collection_id_});
ASSERT_TRUE(status.ok());
status = impl_->UpdateCollectionFile(table_file);
ASSERT_TRUE(status.ok());
@ -714,19 +710,19 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) {
ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt);
FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.null_connection");
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.DeleteCollectionFiles.null_connection");
FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.throw_exception");
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.DeleteCollectionFiles.throw_exception");
status = impl_->DeleteCollectionFiles(collection_id);
status = impl_->DeleteCollectionFiles({collection_id});
ASSERT_TRUE(status.ok());
status = impl_->DropCollection(collection_id);
status = impl_->DropCollections({collection_id});
ASSERT_TRUE(status.ok());
status = impl_->CleanUpFilesWithTTL(0UL);