Fix delete crash when delete multi time (#3163)

* Fix Delete Bug when delete multi time

Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com>

* Fix multi delete crash bug

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* Add filter field when get active collection

Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com>

* Fix cmake link error & code format

Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com>
pull/3167/head^2
BossZou 2020-08-07 14:07:45 +08:00 committed by GitHub
parent 186f36c794
commit c05f67baf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 74 additions and 36 deletions

View File

@ -58,20 +58,34 @@ add_subdirectory( db ) # target milvus_engine
add_subdirectory( log )
add_subdirectory( server )
set(link_lib
milvus_engine
config
metrics
tracing
log
oatpp
query
utils
)
if (MILVUS_WITH_PROMETHEUS)
set(link_lib
${link_lib}
# dependency prometheus
prometheus-cpp-push
prometheus-cpp-pull
prometheus-cpp-core
)
endif ()
set(link_lib
${link_lib}
curl
)
target_link_libraries( server
PUBLIC milvus_engine
config
metrics
tracing
log
oatpp
query
utils
# dependency prometheus
prometheus-cpp-push
prometheus-cpp-pull
prometheus-cpp-core
curl
PUBLIC ${link_lib}
)
# **************************** Get&Print Include Directories ****************************

View File

@ -90,7 +90,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(&old_num_bytes, sizeof(size_t));
delete_ids.resize(old_num_bytes / sizeof(size_t));
delete_ids.resize(old_num_bytes / sizeof(engine::offset_t));
fs_ptr->reader_ptr_->read(delete_ids.data(), old_num_bytes);
fs_ptr->reader_ptr_->close();
} else {

View File

@ -114,9 +114,14 @@ MemCollection::Serialize(uint64_t wal_lsn) {
if (status.ok()) {
break;
} else if (status.code() == SS_STALE_ERROR) {
LOG_ENGINE_WARNING_ << "ApplyDeletes is stale, try again";
std::string err = "ApplyDeletes is stale, try again";
LOG_ENGINE_WARNING_ << err;
std::cout << err << std::endl;
continue;
} else {
std::string err = "ApplyDeletes failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err;
std::cout << err << std::endl;
return status;
}
}
@ -172,6 +177,7 @@ MemCollection::ApplyDeletes() {
auto segments_op = std::make_shared<snapshot::CompoundSegmentsOperation>(context, ss);
int64_t segment_iterated = 0;
// std::vector<snapshot::SegmentPtr> modified_segments;
auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status {
segment_iterated++;
auto seg_visitor = engine::SegmentVisitor::Build(ss, segment->GetID());
@ -179,6 +185,8 @@ MemCollection::ApplyDeletes() {
std::make_shared<segment::SegmentReader>(options_.meta_.path_, seg_visitor);
segment::IdBloomFilterPtr pre_bloom_filter;
STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter));
std::vector<engine::id_t> uids;
STATUS_CHECK(segment_reader->LoadUids(uids));
// Step 1: Check delete_id in mem
std::vector<id_t> delete_ids;
@ -197,9 +205,13 @@ MemCollection::ApplyDeletes() {
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
std::vector<engine::offset_t> pre_del_ids;
if (prev_del_docs) {
pre_del_ids = prev_del_docs->GetDeletedDocs();
if (!pre_del_ids.empty())
auto pre_doc_ids = prev_del_docs->GetDeletedDocs();
if (!pre_doc_ids.empty()) {
for (auto& id : pre_doc_ids) {
pre_del_ids.push_back(uids[id]);
}
delete_ids.insert(delete_ids.end(), pre_del_ids.begin(), pre_del_ids.end());
}
}
// TODO(yhz): Update blacklist in cache
@ -265,8 +277,7 @@ MemCollection::ApplyDeletes() {
segment::IdBloomFilterPtr bloom_filter;
STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter));
auto delete_docs = std::make_shared<segment::DeletedDocs>();
std::vector<id_t> uids;
STATUS_CHECK(segment_reader->LoadUids(uids));
for (size_t i = 0; i < uids.size(); i++) {
if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) {
delete_docs->AddDeletedDoc(i);
@ -284,6 +295,7 @@ MemCollection::ApplyDeletes() {
delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix()));
bloom_filter_file->SetSize(
CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix()));
// modified_segments.push_back(segment);
return Status::OK();
};
@ -296,10 +308,22 @@ MemCollection::ApplyDeletes() {
return Status::OK(); // no segment, nothing to do
}
fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", {
std::srand(std::time(nullptr));
sleep(std::rand() % 3);
});
fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", sleep(1));
// snapshot::ScopedSnapshotT new_ss;
// STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(new_ss, collection_id_));
// if (new_ss->GetID() != ss->GetID()) {
// for (auto& seg : modified_segments) {
// auto pre_seg_commit = ss->GetSegmentCommitBySegmentId(seg->GetID());
// auto new_seg_commit = new_ss->GetSegmentCommitBySegmentId(seg->GetID());
// if (new_seg_commit->GetID() != pre_seg_commit->GetID()) {
// // TODO: Rollback CompoundSegmentsOp
// std::string err = "[CSOE] Segment " + std::to_string(seg->GetID()) + " is stale.";
// LOG_ENGINE_ERROR_ << err;
// return Status(SS_STALE_ERROR, err);
// }
// }
// }
return segments_op->Push();
}

View File

@ -114,10 +114,7 @@ SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) const {
bool
SnapshotHolder::IsActive(Snapshot::Ptr& ss) {
auto collection = ss->GetCollection();
if (collection && collection->IsActive()) {
return true;
}
return false;
return collection && collection->IsActive();
}
Status

View File

@ -195,7 +195,7 @@ class Store : public std::enable_shared_from_this<Store> {
IDS_TYPE ids;
IDS_TYPE selected_ids;
std::vector<State> filter_states = {State::ACTIVE};
adapter_->SelectResourceIDs<Collection>(selected_ids, "", filter_states);
adapter_->SelectResourceIDs<Collection>(selected_ids, StateField::Name, filter_states);
if (!reversed) {
ids = selected_ids;

View File

@ -979,7 +979,7 @@ TEST_F(DBTest, DeleteEntitiesTest) {
milvus::engine::IDNumbers whole_delete_ids;
fiu_init(0);
fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0);
fiu_enable_random("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0, 0.5);
for (size_t i = 0; i < 5; i++) {
std::string partition0 = collection_name + "p_" + std::to_string(i) + "_0";
std::string partition1 = collection_name + "p_" + std::to_string(i) + "_1";
@ -1019,6 +1019,7 @@ TEST_F(DBTest, DeleteEntitiesTest) {
}
TEST_F(DBTest, DeleteStaleTest) {
const int del_id_pair = 3;
auto insert_entities = [&](const std::string& collection, const std::string& partition,
uint64_t count, uint64_t batch_index, milvus::engine::IDNumbers& ids) -> Status {
milvus::engine::DataChunkPtr data_chunk;
@ -1048,7 +1049,7 @@ TEST_F(DBTest, DeleteStaleTest) {
auto delete_task = [&](const std::string& collection, const milvus::engine::IDNumbers& del_ids) {
auto status = Status::OK();
for (size_t i = 0; i < 5; i++) {
for (size_t i = 0; i < del_id_pair; i++) {
milvus::engine::IDNumbers ids = {del_ids[2 * i], del_ids[2 * i + 1]};
status = db_->DeleteEntityByID(collection, ids);
ASSERT_TRUE(status.ok()) << status.ToString();
@ -1072,24 +1073,26 @@ TEST_F(DBTest, DeleteStaleTest) {
status = db_->Flush(collection_name);
ASSERT_TRUE(status.ok()) << status.ToString();
for (size_t i = 0; i < 5; i++) {
for (size_t i = 0; i < del_id_pair; i ++) {
del_ids.push_back(entity_ids[i]);
del_ids.push_back(entity_ids2[i]);
}
fiu_init(0);
fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0);
fiu_enable_random("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0, 0.5);
auto build_thread = std::thread(build_task, collection_name, VECTOR_FIELD_NAME);
auto delete_thread = std::thread(delete_task, collection_name, del_ids);
build_thread.join();
delete_thread.join();
// sleep(15);
build_thread.join();
fiu_disable("MemCollection.ApplyDeletes.RandomSleep");
db_->Flush();
// int64_t row_count;
// status = db_->CountEntities(collection_name, row_count);
// ASSERT_TRUE(status.ok()) << status.ToString();
// ASSERT_EQ(row_count, 10000 * 2 - 2 * del_id_pair);
//
// std::vector<bool> valid_row;
// milvus::engine::DataChunkPtr entity_data_chunk;
// std::cout << "Get Entity" << std::endl;
// for (size_t j = 0; j < del_ids.size(); j++) {
// status = db_->GetEntityByID(collection_name, {del_ids[j]}, {}, valid_row, entity_data_chunk);
// ASSERT_TRUE(status.ok()) << status.ToString();