Fix apply deletes (#5752)

(1) Even if the bloom filter is destroyed, it can delete entity by ID
(2) No duplicate deletion

Resolves: #5537

Signed-off-by: shengjun.li shengjun.li@zilliz.com
pull/5762/head
shengjun.li 2021-06-11 20:20:12 +08:00 committed by GitHub
parent cbba9c4517
commit 73190282bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 154 additions and 145 deletions

View File

@ -7,6 +7,7 @@ Please mark all change in change log and use the issue from GitHub
- \#4897 Query results contain some deleted ids
- \#5164 Exception should be raised if insert or delete entity on the none-existed partition
- \#5191 Mishards throw "index out of range" error after continually search/insert for a period of time
- \#5537 Failed to load bloom filter after suddenly power off
- \#5574 IVF_SQ8 and IVF_PQ cannot be built on multiple GPUs
## Feature

View File

@ -80,58 +80,31 @@ void
DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) {
auto& dir_path = fs_ptr->operation_ptr_->GetDirectory();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
// Create a temporary file from the existing file
const std::string temp_path = dir_path + "/" + "temp_del";
fs_ptr->operation_ptr_->CacheGet(del_file_path);
bool exists = boost::filesystem::exists(del_file_path);
if (exists) {
boost::filesystem::copy_file(del_file_path, temp_path, boost::filesystem::copy_option::fail_if_exists);
}
// Write to the temp file, in order to avoid possible race condition with search (concurrent read and write)
int del_fd = open(temp_path.c_str(), O_RDWR | O_CREAT, 00664);
fs_ptr->operation_ptr_->CacheGet(del_file_path);
// if exist write to the temp file, in order to avoid possible race condition with search
bool exists = boost::filesystem::exists(del_file_path);
const std::string* file_path = exists ? &temp_path : &del_file_path;
int del_fd = open(file_path->c_str(), O_RDWR | O_CREAT, 00664);
if (del_fd == -1) {
std::string err_msg = "Failed to open file: " + temp_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t old_num_bytes;
if (exists) {
if (::read(del_fd, &old_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + temp_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
} else {
old_num_bytes = 0;
}
auto& deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t new_num_bytes = sizeof(segment::offset_t) * deleted_docs->GetSize();
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetSize();
// rewind and overwrite with the new_num_bytes
int off = lseek(del_fd, 0, SEEK_SET);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(del_fd, &new_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
// Move to the end of file and append
off = lseek(del_fd, 0, SEEK_END);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(del_fd, deleted_docs_list.data(), sizeof(segment::offset_t) * deleted_docs->GetSize()) == -1) {
if (::write(del_fd, deleted_docs_list.data(), new_num_bytes) == -1) {
std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
@ -144,8 +117,10 @@ DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segme
}
// Move temp file to delete file
const std::lock_guard<std::mutex> lock(mutex_);
boost::filesystem::rename(temp_path, del_file_path);
if (exists) {
const std::lock_guard<std::mutex> lock(mutex_);
boost::filesystem::rename(temp_path, del_file_path);
}
fs_ptr->operation_ptr_->CachePut(del_file_path);
}

View File

@ -1397,7 +1397,7 @@ DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::Vect
segment::SegmentReader segment_reader(segment_dir);
// uids_ptr
std::shared_ptr<std::vector<segment::doc_id_t>> uids_ptr = nullptr;
segment::UidsPtr uids_ptr = nullptr;
auto LoadUid = [&]() {
auto index = cache::CpuCacheMgr::GetInstance()->GetItem(file.location_);
if (index != nullptr) {
@ -1430,6 +1430,7 @@ DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::Vect
codec::DefaultCodec default_codec;
default_codec.GetIdBloomFilterFormat()->create(uids_ptr->size(), id_bloom_filter_ptr);
id_bloom_filter_ptr->Add(*uids_ptr, deleted_docs_ptr->GetMutableDeletedDocs());
LOG_ENGINE_DEBUG_ << "A new bloom filter is created";
segment::SegmentWriter segment_writer(segment_dir);
segment_writer.WriteBloomFilter(id_bloom_filter_ptr);
@ -1454,7 +1455,6 @@ DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::Vect
// Check whether the id has been deleted
if (!deleted_docs_ptr && !(status = LoadDeleteDoc()).ok()) {
LOG_ENGINE_ERROR_ << status.message();
return status;
}
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

View File

@ -16,6 +16,7 @@
#include <utility>
#include "cache/CpuCacheMgr.h"
#include "codecs/default/DefaultCodec.h"
#include "db/Utils.h"
#include "db/insert/MemTable.h"
#include "db/meta/FilesHolder.h"
@ -185,19 +186,18 @@ MemTable::GetCurrentMem() {
Status
MemTable::ApplyDeletes() {
// Applying deletes to other segments on disk and their corresponding cache:
// Applying deletes to other segments on disk:
// For each segment in collection:
// Load its bloom filter
// For each id in delete list:
// If present, add the uid to segment's uid list
// For each segment
// Get its cache if exists
// Load its uids file.
// Scan the uids, if any uid in segment's uid list exists:
// If present, add the uid to segment's delete list
// if segment delete list is empty
// continue
// Load its uids and deleted docs file
// Scan the uids, if any un-deleted uid in segment's delete list
// add its offset to deletedDoc
// remove the id from bloom filter
// set black list in cache
// Serialize segment's deletedDoc TODO(zhiru): append directly to previous file for now, may have duplicates
// Serialize segment's deletedDoc
// Serialize bloom filter
LOG_ENGINE_DEBUG_ << "Applying " << doc_ids_to_delete_.size() << " deletes in collection: " << collection_id_;
@ -217,52 +217,24 @@ MemTable::ApplyDeletes() {
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
// which file need to be apply delete
std::vector<std::pair<segment::IdBloomFilterPtr, std::vector<segment::doc_id_t>>> ids_check_pair;
ids_check_pair.resize(files.size());
size_t unmark_file_cnt = 0;
for (size_t file_i = 0; file_i < files.size(); file_i++) {
auto& file = files[file_i];
auto& id_bloom_filter_ptr = ids_check_pair[file_i].first;
auto& ids_to_check = ids_check_pair[file_i].second;
ids_to_check.reserve(doc_ids_to_delete_.size());
std::string segment_dir;
utils::GetParentPath(file.location_, segment_dir);
segment::SegmentReader segment_reader(segment_dir);
segment_reader.LoadBloomFilter(id_bloom_filter_ptr, true);
for (auto& id : doc_ids_to_delete_) {
if (id_bloom_filter_ptr->Check(id)) {
ids_to_check.emplace_back(id);
}
}
// release unused files
if (ids_to_check.empty()) {
id_bloom_filter_ptr = nullptr;
files_holder.UnmarkFile(file);
++unmark_file_cnt;
}
}
recorder.RecordSection("Found " + std::to_string(files.size() - unmark_file_cnt) + " segment to apply deletes");
meta::SegmentsSchema files_to_update;
for (size_t file_i = 0; file_i < files.size(); file_i++) {
auto& file = files[file_i];
auto& id_bloom_filter_ptr = ids_check_pair[file_i].first;
auto& ids_to_check = ids_check_pair[file_i].second;
if (id_bloom_filter_ptr == nullptr) {
continue;
}
for (auto& file : files) {
LOG_ENGINE_DEBUG_ << "Applying deletes in segment: " << file.segment_id_;
segment::IdBloomFilterPtr id_bloom_filter_ptr = nullptr;
segment::UidsPtr uids_ptr = nullptr;
segment::DeletedDocsPtr deleted_docs_ptr = nullptr;
std::vector<segment::doc_id_t> ids_to_check;
TimeRecorder rec("handle segment " + file.segment_id_);
// segment reader
std::string segment_dir;
utils::GetParentPath(file.location_, segment_dir);
segment::SegmentReader segment_reader(segment_dir);
// prepare segment_files
meta::FilesHolder segment_holder;
status = meta_->GetCollectionFilesBySegmentId(file.segment_id_, segment_holder);
@ -271,77 +243,117 @@ MemTable::ApplyDeletes() {
}
milvus::engine::meta::SegmentsSchema& segment_files = segment_holder.HoldFiles();
// prepare segment dir
std::string segment_dir;
utils::GetParentPath(file.location_, segment_dir);
// load uids
segment::UidsPtr uids_ptr = nullptr;
for (auto& segment_file : segment_files) {
auto data_obj_ptr = cache::CpuCacheMgr::GetInstance()->GetItem(segment_file.location_);
auto index = std::static_pointer_cast<knowhere::VecIndex>(data_obj_ptr);
if (index != nullptr) {
uids_ptr = index->GetUids();
break;
// Lamda: LoadUid
auto LoadUid = [&]() {
for (auto& segment_file : segment_files) {
auto data_obj_ptr = cache::CpuCacheMgr::GetInstance()->GetItem(segment_file.location_);
auto index = std::static_pointer_cast<knowhere::VecIndex>(data_obj_ptr);
if (index != nullptr) {
uids_ptr = index->GetUids();
return Status::OK();
}
}
}
if (uids_ptr == nullptr) {
// load uids from disk
segment::SegmentReader segment_reader(segment_dir);
status = segment_reader.LoadUids(uids_ptr);
if (!status.ok()) {
return segment_reader.LoadUids(uids_ptr);
};
// Lamda: LoadDeleteDoc
auto LoadDeleteDoc = [&]() { return segment_reader.LoadDeletedDocs(deleted_docs_ptr); };
// load bloom filter
status = segment_reader.LoadBloomFilter(id_bloom_filter_ptr, true);
if (!status.ok()) {
// Some accidents may cause the bloom filter file destroyed.
// If failed to load bloom filter, just to create a new one.
if (!(status = LoadUid()).ok()) {
return status;
}
if (!(status = LoadDeleteDoc()).ok()) {
return status;
}
codec::DefaultCodec default_codec;
default_codec.GetIdBloomFilterFormat()->create(uids_ptr->size(), id_bloom_filter_ptr);
id_bloom_filter_ptr->Add(*uids_ptr, deleted_docs_ptr->GetMutableDeletedDocs());
LOG_ENGINE_DEBUG_ << "A new bloom filter is created";
segment::SegmentWriter segment_writer(segment_dir);
segment_writer.WriteBloomFilter(id_bloom_filter_ptr);
}
rec.RecordSection("Loading uids");
std::sort(ids_to_check.begin(), ids_to_check.end());
rec.RecordSection("Sorting " + std::to_string(ids_to_check.size()) + " ids");
auto find_diff = std::chrono::duration<double>::zero();
auto set_diff = std::chrono::duration<double>::zero();
segment::DeletedDocsPtr deleted_docs = std::make_shared<segment::DeletedDocs>();
for (size_t i = 0; i < uids_ptr->size(); ++i) {
auto find_start = std::chrono::high_resolution_clock::now();
auto found = std::binary_search(ids_to_check.begin(), ids_to_check.end(), (*uids_ptr)[i]);
auto find_end = std::chrono::high_resolution_clock::now();
find_diff += (find_end - find_start);
if (found) {
auto set_start = std::chrono::high_resolution_clock::now();
deleted_docs->AddDeletedDoc(i);
id_bloom_filter_ptr->Remove((*uids_ptr)[i]);
auto set_end = std::chrono::high_resolution_clock::now();
set_diff += (set_end - set_start);
// check ids by bloom filter
for (auto& id : doc_ids_to_delete_) {
if (id_bloom_filter_ptr->Check(id)) {
ids_to_check.emplace_back(id);
}
}
LOG_ENGINE_DEBUG_ << "Finding " << ids_to_check.size() << " uids in " << uids_ptr->size() << " uids took "
<< find_diff.count() << " s in total";
LOG_ENGINE_DEBUG_ << "Setting deleted docs and bloom filter took " << set_diff.count() << " s in total";
rec.RecordSection("bloom filter check end, segment delete list cnt " + std::to_string(ids_to_check.size()));
rec.RecordSection("Find uids and set deleted docs and bloom filter");
// release unused files
if (ids_to_check.empty()) {
files_holder.UnmarkFile(file);
continue;
}
if (deleted_docs->GetSize() == 0) {
// Load its uids and deleted docs file
if (uids_ptr == nullptr && !(status = LoadUid()).ok()) {
return status;
}
if (deleted_docs_ptr == nullptr && !(status = LoadDeleteDoc()).ok()) {
return status;
}
auto& deleted_docs = deleted_docs_ptr->GetMutableDeletedDocs();
rec.RecordSection("load uids and deleted docs");
// sort ids_to_check
bool ids_sorted = false;
if (ids_to_check.size() >= 64) {
std::sort(ids_to_check.begin(), ids_to_check.end());
ids_sorted = true;
rec.RecordSection("Sorting " + std::to_string(ids_to_check.size()) + " ids");
}
// for each id
int64_t segment_deleted_count = 0;
for (size_t i = 0; i < uids_ptr->size(); ++i) {
if (std::find(deleted_docs.begin(), deleted_docs.end(), i) != deleted_docs.end()) {
continue;
}
if (ids_sorted) {
if (!std::binary_search(ids_to_check.begin(), ids_to_check.end(), (*uids_ptr)[i])) {
continue;
}
} else {
if (std::find(ids_to_check.begin(), ids_to_check.end(), (*uids_ptr)[i]) == ids_to_check.end()) {
continue;
}
}
// delete
id_bloom_filter_ptr->Remove((*uids_ptr)[i]);
deleted_docs.push_back(i);
segment_deleted_count++;
}
rec.RecordSection("Find uids and set deleted docs and bloom filter, append " +
std::to_string(segment_deleted_count) + " offsets");
if (segment_deleted_count == 0) {
LOG_ENGINE_DEBUG_ << "deleted_docs does not need to be updated";
files_holder.UnmarkFile(file);
continue;
}
segment::Segment tmp_segment;
segment::SegmentWriter segment_writer(segment_dir);
status = segment_writer.WriteDeletedDocs(deleted_docs);
status = segment_writer.WriteDeletedDocs(deleted_docs_ptr);
if (!status.ok()) {
break;
}
rec.RecordSection("Appended " + std::to_string(deleted_docs->GetSize()) + " offsets to deleted docs");
rec.RecordSection("Updated deleted docs");
status = segment_writer.WriteBloomFilter(id_bloom_filter_ptr);
if (!status.ok()) {
@ -356,14 +368,14 @@ MemTable::ApplyDeletes() {
segment_file.file_type_ == meta::SegmentSchema::TO_INDEX ||
segment_file.file_type_ == meta::SegmentSchema::INDEX ||
segment_file.file_type_ == meta::SegmentSchema::BACKUP) {
segment_file.row_count_ -= deleted_docs->GetSize();
segment_file.row_count_ -= segment_deleted_count;
files_to_update.emplace_back(segment_file);
}
}
rec.RecordSection("Update collection file row count in vector");
}
recorder.RecordSection("Finished " + std::to_string(files.size() - unmark_file_cnt) + " segment to apply deletes");
recorder.RecordSection("Finished " + std::to_string(files.size()) + " segment to apply deletes");
status = meta_->UpdateCollectionFilesRowCount(files_to_update);

View File

@ -157,7 +157,7 @@ TEST_F(DeleteTest, DELETE_ON_DISK) {
std::mt19937 gen(rd());
std::uniform_int_distribution<int64_t> dis(0, nb - 1);
int64_t num_query = 10;
int64_t num_query = 80;
std::map<int64_t, milvus::engine::VectorsData> search_vectors;
for (int64_t i = 0; i < num_query; ++i) {
int64_t index = dis(gen);
@ -168,6 +168,7 @@ TEST_F(DeleteTest, DELETE_ON_DISK) {
}
search_vectors.insert(std::make_pair(xb.id_array_[index], search));
}
auto it = search_vectors.begin();
// std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk
stat = db_->Flush();
@ -175,8 +176,12 @@ TEST_F(DeleteTest, DELETE_ON_DISK) {
milvus::engine::IDNumbers delete_ids;
delete_ids.reserve(search_vectors.size());
for (auto& kv : search_vectors) {
delete_ids.push_back(kv.first);
// delete 10 items
int64_t delete_batch_1 = 10;
for (int64_t i = 0; i < delete_batch_1; i++) {
delete_ids.push_back(it->first);
it++;
}
stat = db_->DeleteVectors(collection_info.collection_id_, "", delete_ids);
ASSERT_TRUE(stat.ok());
@ -185,6 +190,22 @@ TEST_F(DeleteTest, DELETE_ON_DISK) {
ASSERT_TRUE(stat.ok());
uint64_t row_count;
stat = db_->GetCollectionRowCount(collection_info.collection_id_, row_count);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(row_count, nb - delete_ids.size());
// delete others
delete_ids.clear();
for (int64_t i = delete_batch_1; i < num_query; i++) {
delete_ids.push_back(it->first);
it++;
}
stat = db_->DeleteVectors(collection_info.collection_id_, "", delete_ids);
ASSERT_TRUE(stat.ok());
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
stat = db_->GetCollectionRowCount(collection_info.collection_id_, row_count);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(row_count, nb - search_vectors.size());