mirror of https://github.com/milvus-io/milvus.git
Check stale segment file when flush delete ids (#3283)
* Check stale file modified Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> * Modify test Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> * Add more detail for debuging Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> * Change del ids type Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> * Remove surplus code Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> * Allow to delete id-deulicated entities Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com>pull/3285/head^2
parent
b8e96a7fb9
commit
788b015667
|
@ -117,12 +117,10 @@ MemCollection::Serialize(uint64_t wal_lsn) {
|
|||
} else if (status.code() == SS_STALE_ERROR) {
|
||||
std::string err = "ApplyDeletes is stale, try again";
|
||||
LOG_ENGINE_WARNING_ << err;
|
||||
std::cout << err << std::endl;
|
||||
continue;
|
||||
} else {
|
||||
std::string err = "ApplyDeletes failed: " + status.ToString();
|
||||
LOG_ENGINE_ERROR_ << err;
|
||||
std::cout << err << std::endl;
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
@ -172,13 +170,11 @@ MemCollection::ApplyDeletes() {
|
|||
snapshot::ScopedSnapshotT ss;
|
||||
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_));
|
||||
|
||||
// TODO: check stale segment files here
|
||||
snapshot::OperationContext context;
|
||||
context.lsn = lsn_;
|
||||
auto segments_op = std::make_shared<snapshot::CompoundSegmentsOperation>(context, ss);
|
||||
|
||||
int64_t segment_iterated = 0;
|
||||
// std::vector<snapshot::SegmentPtr> modified_segments;
|
||||
auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status {
|
||||
segment_iterated++;
|
||||
auto seg_visitor = engine::SegmentVisitor::Build(ss, segment->GetID());
|
||||
|
@ -204,29 +200,10 @@ MemCollection::ApplyDeletes() {
|
|||
std::vector<engine::id_t> uids;
|
||||
STATUS_CHECK(segment_reader->LoadUids(uids));
|
||||
|
||||
// Step 2: Load previous delete_id and merge into 'delete_ids'
|
||||
segment::DeletedDocsPtr prev_del_docs;
|
||||
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
|
||||
std::vector<engine::offset_t> pre_del_ids;
|
||||
if (prev_del_docs) {
|
||||
auto pre_doc_ids = prev_del_docs->GetDeletedDocs();
|
||||
if (!pre_doc_ids.empty()) {
|
||||
for (auto& id : pre_doc_ids) {
|
||||
pre_del_ids.push_back(uids[id]);
|
||||
}
|
||||
delete_ids.insert(delete_ids.end(), pre_del_ids.begin(), pre_del_ids.end());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yhz): Update blacklist in cache
|
||||
// std::vector<knowhere::VecIndexPtr> indexes;
|
||||
// std::vector<faiss::ConcurrentBitsetPtr> blacklists;
|
||||
std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER;
|
||||
|
||||
std::sort(delete_ids.begin(), delete_ids.end());
|
||||
std::set<id_t> ids_to_check(delete_ids.begin(), delete_ids.end());
|
||||
|
||||
// Step 3: Mark previous deleted docs file and bloom filter file stale
|
||||
// Step 2: Mark previous deleted docs file and bloom filter file stale
|
||||
auto& field_visitors_map = seg_visitor->GetFieldVisitors();
|
||||
auto uid_field_visitor = seg_visitor->GetFieldVisitor(engine::FIELD_UID);
|
||||
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS);
|
||||
|
@ -236,14 +213,12 @@ MemCollection::ApplyDeletes() {
|
|||
|
||||
auto segment_file_executor = [&](const snapshot::SegmentFilePtr& segment_file,
|
||||
snapshot::SegmentFileIterator* iterator) -> Status {
|
||||
if (segment_file->GetSegmentId() != segment->GetID()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (segment_file->GetFieldElementId() == del_docs_element->GetID() ||
|
||||
segment_file->GetFieldElementId() == blm_filter_element->GetID()) {
|
||||
if (segment_file->GetSegmentId() == segment->GetID() &&
|
||||
(segment_file->GetFieldElementId() == del_docs_element->GetID() ||
|
||||
segment_file->GetFieldElementId() == blm_filter_element->GetID())) {
|
||||
segments_op->AddStaleSegmentFile(segment_file);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
|
@ -251,7 +226,7 @@ MemCollection::ApplyDeletes() {
|
|||
segment_file_iterator->Iterate();
|
||||
STATUS_CHECK(segment_file_iterator->GetStatus());
|
||||
|
||||
// Step 4: Create new deleted docs file and bloom filter file
|
||||
// Step 3: Create new deleted docs file and bloom filter file
|
||||
snapshot::SegmentFileContext del_file_context;
|
||||
del_file_context.field_name = uid_field_visitor->GetField()->GetName();
|
||||
del_file_context.field_element_name = del_docs_element->GetName();
|
||||
|
@ -260,6 +235,8 @@ MemCollection::ApplyDeletes() {
|
|||
del_file_context.segment_id = segment->GetID();
|
||||
snapshot::SegmentFilePtr delete_file;
|
||||
STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file));
|
||||
|
||||
std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER;
|
||||
auto segment_writer = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, seg_visitor);
|
||||
|
||||
std::string del_docs_path = snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, delete_file);
|
||||
|
@ -277,24 +254,37 @@ MemCollection::ApplyDeletes() {
|
|||
std::string bloom_filter_file_path =
|
||||
snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, bloom_filter_file);
|
||||
|
||||
// Step 5: Write to file
|
||||
// Step 4: update delete docs and bloom filter
|
||||
{
|
||||
segment::IdBloomFilterPtr bloom_filter;
|
||||
STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter));
|
||||
auto delete_docs = std::make_shared<segment::DeletedDocs>();
|
||||
std::vector<id_t> uids;
|
||||
STATUS_CHECK(segment_reader->LoadUids(uids));
|
||||
std::vector<engine::offset_t> delete_docs_offset;
|
||||
for (size_t i = 0; i < uids.size(); i++) {
|
||||
if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) {
|
||||
delete_docs->AddDeletedDoc(i);
|
||||
delete_docs_offset.emplace_back(i);
|
||||
} else {
|
||||
bloom_filter->Add(uids[i]);
|
||||
}
|
||||
}
|
||||
|
||||
STATUS_CHECK(
|
||||
segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true));
|
||||
STATUS_CHECK(segments_op->CommitRowCountDelta(segment->GetID(), delete_docs_offset.size(), true));
|
||||
|
||||
// Load previous delete_id and merge into 'delete_ids'
|
||||
segment::DeletedDocsPtr prev_del_docs;
|
||||
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
|
||||
if (prev_del_docs) {
|
||||
auto& pre_del_offsets = prev_del_docs->GetDeletedDocs();
|
||||
size_t delete_docs_size = delete_docs_offset.size();
|
||||
for (auto& offset : pre_del_offsets) {
|
||||
if (!std::binary_search(delete_docs_offset.begin(), delete_docs_offset.begin() + delete_docs_size,
|
||||
offset)) {
|
||||
delete_docs_offset.emplace_back(offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
std::sort(delete_docs_offset.begin(), delete_docs_offset.end());
|
||||
|
||||
auto delete_docs = std::make_shared<segment::DeletedDocs>(delete_docs_offset);
|
||||
STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs));
|
||||
STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter));
|
||||
}
|
||||
|
@ -302,7 +292,6 @@ MemCollection::ApplyDeletes() {
|
|||
delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix()));
|
||||
bloom_filter_file->SetSize(
|
||||
CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix()));
|
||||
// modified_segments.push_back(segment);
|
||||
|
||||
return Status::OK();
|
||||
};
|
||||
|
@ -316,22 +305,6 @@ MemCollection::ApplyDeletes() {
|
|||
}
|
||||
|
||||
fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", sleep(1));
|
||||
|
||||
// snapshot::ScopedSnapshotT new_ss;
|
||||
// STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(new_ss, collection_id_));
|
||||
// if (new_ss->GetID() != ss->GetID()) {
|
||||
// for (auto& seg : modified_segments) {
|
||||
// auto pre_seg_commit = ss->GetSegmentCommitBySegmentId(seg->GetID());
|
||||
// auto new_seg_commit = new_ss->GetSegmentCommitBySegmentId(seg->GetID());
|
||||
// if (new_seg_commit->GetID() != pre_seg_commit->GetID()) {
|
||||
// // TODO: Rollback CompoundSegmentsOp
|
||||
// std::string err = "[CSOE] Segment " + std::to_string(seg->GetID()) + " is stale.";
|
||||
// LOG_ENGINE_ERROR_ << err;
|
||||
// return Status(SS_STALE_ERROR, err);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
return segments_op->Push();
|
||||
}
|
||||
|
||||
|
|
|
@ -95,15 +95,33 @@ CompoundSegmentsOperation::AddStaleSegmentFile(const SegmentFilePtr& stale_segme
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
bool
|
||||
CompoundSegmentsOperation::StaleSegmentFilesModified() {
|
||||
for (auto& kv : stale_segment_files_) {
|
||||
for (auto& file : kv.second) {
|
||||
auto segment_file = GetAdjustedSS()->GetResource<SegmentFile>(file->GetID());
|
||||
if (segment_file == nullptr || segment_file->IsDeactive()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
Status
|
||||
CompoundSegmentsOperation::DoExecute(StorePtr store) {
|
||||
if (!context_.new_segment && stale_segment_files_.size() == 0 && new_segment_files_.size() == 0) {
|
||||
if (!context_.new_segment && stale_segment_files_.empty() && new_segment_files_.empty()) {
|
||||
return Status(SS_INVALID_CONTEX_ERROR, "Nothing to do");
|
||||
}
|
||||
if (context_.new_segment && context_.new_segment->IsActive()) {
|
||||
return Status(SS_INVALID_CONTEX_ERROR, "New segment should not be active");
|
||||
}
|
||||
|
||||
if (StaleSegmentFilesModified()) {
|
||||
return Status(SS_STALE_ERROR, "Segment file has been stale");
|
||||
}
|
||||
|
||||
auto update_size = [&](SegmentFilePtr& file) {
|
||||
auto update_ctx = ResourceContextBuilder<SegmentFile>().SetOp(meta::oUpdate).CreatePtr();
|
||||
update_ctx->AddAttr(SizeField::Name);
|
||||
|
|
|
@ -105,6 +105,10 @@ class CompoundSegmentsOperation : public CompoundBaseOperation<CompoundSegmentsO
|
|||
Status
|
||||
CommitRowCountDelta(ID_TYPE segment_id, SIZE_TYPE delta, bool sub = true);
|
||||
|
||||
protected:
|
||||
bool
|
||||
StaleSegmentFilesModified();
|
||||
|
||||
protected:
|
||||
std::map<ID_TYPE, std::pair<SIZE_TYPE, bool>> delta_;
|
||||
std::map<ID_TYPE, SegmentFile::VecT> stale_segment_files_;
|
||||
|
|
|
@ -1264,18 +1264,18 @@ TEST_F(DBTest, DeleteStaleTest) {
|
|||
build_thread.join();
|
||||
fiu_disable("MemCollection.ApplyDeletes.RandomSleep");
|
||||
db_->Flush();
|
||||
// int64_t row_count;
|
||||
// status = db_->CountEntities(collection_name, row_count);
|
||||
// ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
// ASSERT_EQ(row_count, 10000 * 2 - 2 * del_id_pair);
|
||||
//
|
||||
// std::vector<bool> valid_row;
|
||||
// milvus::engine::DataChunkPtr entity_data_chunk;
|
||||
// for (size_t j = 0; j < del_ids.size(); j++) {
|
||||
// status = db_->GetEntityByID(collection_name, {del_ids[j]}, {}, valid_row, entity_data_chunk);
|
||||
// ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
// ASSERT_EQ(entity_data_chunk->count_, 0) << "[" << j << "] Delete id " << del_ids[j] << " failed.";
|
||||
// }
|
||||
int64_t row_count;
|
||||
status = db_->CountEntities(collection_name, row_count);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_EQ(row_count, 10000 * 2 - 2 * del_id_pair);
|
||||
|
||||
std::vector<bool> valid_row;
|
||||
milvus::engine::DataChunkPtr entity_data_chunk;
|
||||
for (size_t j = 0; j < del_ids.size(); j++) {
|
||||
status = db_->GetEntityByID(collection_name, {del_ids[j]}, {}, valid_row, entity_data_chunk);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_EQ(entity_data_chunk->count_, 0) << "[" << j << "] Delete id " << del_ids[j] << " failed.";
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, LoadTest) {
|
||||
|
|
Loading…
Reference in New Issue