use multi-segments operation to flush (#3804)

* use multi-segments operation to flush

Signed-off-by: groot <yihua.mo@zilliz.com>

* typo

Signed-off-by: groot <yihua.mo@zilliz.com>
Signed-off-by: shengjun.li <shengjun.li@zilliz.com>
pull/3855/head
groot 2020-09-19 17:12:32 +08:00 committed by shengjun.li
parent 0571fe18b9
commit 962d0828b7
7 changed files with 136 additions and 73 deletions

View File

@ -732,7 +732,7 @@ DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id,
// remove delete id from the id list
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader->LoadDeletedDocs(deleted_docs_ptr));
segment_reader->LoadDeletedDocs(deleted_docs_ptr);
if (deleted_docs_ptr) {
const std::vector<offset_t>& delete_ids = deleted_docs_ptr->GetDeletedDocs();
std::vector<offset_t> temp_ids;
@ -840,8 +840,8 @@ DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::stri
std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);
segment::DeletedDocsPtr deleted_docs;
status = segment_reader->LoadDeletedDocs(deleted_docs);
if (!status.ok() || deleted_docs == nullptr) {
segment_reader->LoadDeletedDocs(deleted_docs);
if (deleted_docs == nullptr) {
continue; // no deleted docs, no need to compact
}

View File

@ -110,6 +110,7 @@ GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr<m
std::vector<bool>& valid_row)
: BaseT(ss), context_(context), dir_root_(dir_root), ids_(ids), field_names_(field_names), valid_row_(valid_row) {
ids_left_ = ids_;
data_chunk_ = std::make_shared<engine::DataChunk>();
}
Status
@ -129,7 +130,7 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
STATUS_CHECK(segment_reader.LoadBloomFilter(id_bloom_filter_ptr));
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader.LoadDeletedDocs(deleted_docs_ptr));
segment_reader.LoadDeletedDocs(deleted_docs_ptr);
std::vector<idx_t> ids_in_this_segment;
std::vector<int64_t> offsets;
@ -213,7 +214,6 @@ GetEntityByIdSegmentHandler::PostIterate() {
}
}
data_chunk_ = std::make_shared<engine::DataChunk>();
data_chunk_->count_ = temp_segment.GetRowCount();
data_chunk_->fixed_fields_.swap(temp_segment.GetFixedFields());
data_chunk_->variable_fields_.swap(temp_segment.GetVariableFields());

View File

@ -141,21 +141,16 @@ MemCollection::Serialize() {
recorder.RecordSection("ApplyDeleteToFile");
// serialize mem to new segment files
// delete ids will be applied in MemSegment::Serialize() method
std::lock_guard<std::mutex> lock(mem_mutex_);
for (auto& partition_segments : mem_segments_) {
MemSegmentList& segments = partition_segments.second;
for (auto& segment : segments) {
auto status = segment->Serialize();
if (!status.ok()) {
return status;
}
while (true) {
auto status = SerializeSegments();
if (status.ok()) {
break;
} else if (status.code() == SS_STALE_ERROR) {
LOG_ENGINE_ERROR_ << "Failed to serialize segments: something stale. Try again";
continue;
}
}
mem_segments_.clear();
recorder.RecordSection("Finished flushing");
recorder.RecordSection("SerializeSegments");
return Status::OK();
}
@ -202,7 +197,7 @@ MemCollection::ApplyDeleteToFile() {
// Load previous deleted offsets
segment::DeletedDocsPtr prev_del_docs;
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
segment_reader->LoadDeletedDocs(prev_del_docs);
std::unordered_set<engine::offset_t> del_offsets;
if (prev_del_docs) {
auto prev_del_offsets = prev_del_docs->GetDeletedDocs();
@ -264,7 +259,7 @@ MemCollection::ApplyDeleteToFile() {
}
Status
MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::CompoundSegmentsOperation>& segments_op,
MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::CompoundSegmentsOperation>& operation,
const snapshot::ScopedSnapshotT& ss, engine::SegmentVisitorPtr& seg_visitor,
const std::unordered_set<engine::offset_t>& del_offsets,
uint64_t new_deleted, segment::IdBloomFilterPtr& bloom_filter) {
@ -282,7 +277,7 @@ MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::Comp
if (segment_file->GetSegmentId() == segment->GetID() &&
(segment_file->GetFieldElementId() == del_docs_element->GetID() ||
segment_file->GetFieldElementId() == blm_filter_element->GetID())) {
segments_op->AddStaleSegmentFile(segment_file);
operation->AddStaleSegmentFile(segment_file);
}
return Status::OK();
@ -300,7 +295,7 @@ MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::Comp
del_file_context.partition_id = segment->GetPartitionId();
del_file_context.segment_id = segment->GetID();
snapshot::SegmentFilePtr delete_file;
STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file));
STATUS_CHECK(operation->CommitNewSegmentFile(del_file_context, delete_file));
std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER;
auto segment_writer = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, seg_visitor);
@ -315,13 +310,13 @@ MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::Comp
bloom_file_context.segment_id = segment->GetID();
engine::snapshot::SegmentFile::Ptr bloom_filter_file;
STATUS_CHECK(segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file));
STATUS_CHECK(operation->CommitNewSegmentFile(bloom_file_context, bloom_filter_file));
std::string bloom_filter_file_path =
snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, bloom_filter_file);
// Step 3: update delete docs and bloom filter
STATUS_CHECK(segments_op->CommitRowCountDelta(segment->GetID(), new_deleted, true));
STATUS_CHECK(operation->CommitRowCountDelta(segment->GetID(), new_deleted, true));
std::vector<int32_t> vec_del_offsets;
vec_del_offsets.reserve(del_offsets.size());
@ -339,6 +334,46 @@ MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::Comp
return Status::OK();
}
Status
MemCollection::SerializeSegments() {
std::lock_guard<std::mutex> lock(mem_mutex_);
if (mem_segments_.empty()) {
return Status::OK();
}
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_));
snapshot::OperationContext context;
auto operation = std::make_shared<snapshot::MultiSegmentsOperation>(context, ss);
// serialize each segment in buffer
// delete ids will be applied in MemSegment::Serialize() method
idx_t max_op_id = 0;
for (auto& partition_segments : mem_segments_) {
MemSegmentList& segments = partition_segments.second;
for (auto& segment : segments) {
STATUS_CHECK(segment->Serialize(ss, operation));
idx_t segment_max_op_id = segment->GetMaxOpID();
if (max_op_id < segment_max_op_id) {
max_op_id = segment_max_op_id;
}
}
}
// if any segment is actually created, call push() method
// sometimes segment might be ignored since its row count is 0 (all deleted)
if (!operation->GetContext().new_segments.empty()) {
STATUS_CHECK(operation->Push());
}
mem_segments_.clear();
// notify wal the max operation id is done
WalManager::GetInstance().OperationDone(ss->GetName(), max_op_id);
return Status::OK();
}
int64_t
MemCollection::GetCollectionId() const {
return collection_id_;

View File

@ -60,12 +60,15 @@ class MemCollection {
size_t
GetCurrentMem();
Status
SerializeSegments();
private:
Status
ApplyDeleteToFile();
Status
CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::CompoundSegmentsOperation>& segments_op,
CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::CompoundSegmentsOperation>& operation,
const snapshot::ScopedSnapshotT& ss, engine::SegmentVisitorPtr& seg_visitor,
const std::unordered_set<engine::offset_t>& del_offsets, uint64_t new_deleted,
segment::IdBloomFilterPtr& bloom_filter);

View File

@ -82,33 +82,34 @@ MemSegment::Delete(const std::vector<idx_t>& ids, idx_t op_id) {
}
Status
MemSegment::Serialize() {
int64_t size = GetCurrentMem();
server::CollectSerializeMetrics metrics(size);
MemSegment::Serialize(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snapshot::MultiSegmentsOperation>& operation) {
int64_t mem_size = GetCurrentMem();
server::CollectSerializeMetrics metrics(mem_size);
// delete in mem
STATUS_CHECK(ApplyDeleteToMem());
// create new segment and serialize
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
if (!status.ok()) {
std::string err_msg = "Failed to get latest snapshot: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
// empty segment, nothing to do
if (actions_.empty()) {
return Status::OK();
}
// get max op id
idx_t max_op_id = 0;
for (auto& action : actions_) {
if (action.op_id_ > max_op_id) {
max_op_id = action.op_id_;
if (action.op_id_ > max_op_id_) {
max_op_id_ = action.op_id_;
}
}
std::shared_ptr<snapshot::NewSegmentOperation> new_seg_operation;
// delete in mem
auto status = ApplyDeleteToMem(ss);
if (!status.ok()) {
if (status.code() == DB_EMPTY_COLLECTION) {
// segment is empty, do nothing
return Status::OK();
}
}
// create new segment and serialize
segment::SegmentWriterPtr segment_writer;
status = CreateNewSegment(ss, new_seg_operation, segment_writer, max_op_id);
status = CreateNewSegment(ss, operation, segment_writer);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create new segment";
return status;
@ -134,39 +135,36 @@ MemSegment::Serialize() {
return status;
}
STATUS_CHECK(new_seg_operation->CommitRowCount(segment_writer->RowCount()));
STATUS_CHECK(new_seg_operation->Push());
LOG_ENGINE_DEBUG_ << "New segment " << seg_id << " of collection " << collection_id_ << " serialized";
operation->CommitRowCount(seg_id, segment_writer->RowCount());
// notify wal the max operation id is done
WalManager::GetInstance().OperationDone(ss->GetName(), max_op_id);
LOG_ENGINE_DEBUG_ << "New segment " << seg_id << " of collection " << collection_id_ << " committed";
return Status::OK();
}
Status
MemSegment::CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snapshot::NewSegmentOperation>& operation,
segment::SegmentWriterPtr& writer, idx_t max_op_id) {
// create segment
snapshot::SegmentPtr segment;
snapshot::OperationContext context;
// context.lsn = max_op_id;
context.prev_partition = ss->GetResource<snapshot::Partition>(partition_id_);
operation = std::make_shared<snapshot::NewSegmentOperation>(context, ss);
auto status = operation->CommitNewSegment(segment);
MemSegment::CreateNewSegment(snapshot::ScopedSnapshotT& ss,
std::shared_ptr<snapshot::MultiSegmentsOperation>& operation,
segment::SegmentWriterPtr& writer) {
// create new segment
snapshot::SegmentPtr new_segment;
snapshot::OperationContext new_seg_ctx;
new_seg_ctx.prev_partition = ss->GetResource<snapshot::Partition>(partition_id_);
auto status = operation->CommitNewSegment(new_seg_ctx, new_segment);
if (!status.ok()) {
std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
std::string err_msg = "MemSegment::CreateNewSegment failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
snapshot::SegmentFile::VecT new_segment_files;
// create segment raw files (placeholder)
auto names = ss->GetFieldNames();
for (auto& name : names) {
snapshot::SegmentFileContext sf_context;
sf_context.collection_id = collection_id_;
sf_context.partition_id = partition_id_;
sf_context.segment_id = segment->GetID();
sf_context.segment_id = new_segment->GetID();
sf_context.field_name = name;
sf_context.field_element_name = engine::ELEMENT_RAW_DATA;
@ -177,6 +175,7 @@ MemSegment::CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snap
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
new_segment_files.emplace_back(seg_file);
}
// create deleted_doc and bloom_filter files (placeholder)
@ -184,7 +183,7 @@ MemSegment::CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snap
snapshot::SegmentFileContext sf_context;
sf_context.collection_id = collection_id_;
sf_context.partition_id = partition_id_;
sf_context.segment_id = segment->GetID();
sf_context.segment_id = new_segment->GetID();
sf_context.field_name = engine::FIELD_UID;
sf_context.field_element_name = engine::ELEMENT_DELETED_DOCS;
@ -203,10 +202,12 @@ MemSegment::CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snap
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
new_segment_files.emplace_back(delete_doc_file);
new_segment_files.emplace_back(bloom_filter_file);
}
auto ctx = operation->GetContext();
auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);
auto visitor = SegmentVisitor::Build(ss, new_segment, new_segment_files);
// create segment writer
writer = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, visitor);
@ -215,7 +216,7 @@ MemSegment::CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snap
}
Status
MemSegment::ApplyDeleteToMem() {
MemSegment::ApplyDeleteToMem(snapshot::ScopedSnapshotT& ss) {
auto outer_iter = actions_.begin();
for (; outer_iter != actions_.end(); ++outer_iter) {
MemAction& action = (*outer_iter);
@ -254,15 +255,32 @@ MemSegment::ApplyDeleteToMem() {
}
}
// delete entities from chunks
// construct a new engine::Segment, delete entities from chunks
// since the temp_set is empty, it shared BinaryData with the chunk
// the DeleteEntity() will delete elements from the chunk
Segment temp_set;
STATUS_CHECK(temp_set.SetFields(collection_id_));
auto& fields = ss->GetResources<snapshot::Field>();
for (auto& kv : fields) {
const snapshot::FieldPtr& field = kv.second.Get();
STATUS_CHECK(temp_set.AddField(field));
}
STATUS_CHECK(temp_set.AddChunk(chunk));
temp_set.DeleteEntity(offsets);
chunk->count_ = temp_set.GetRowCount();
}
}
int64_t row_count = 0;
for (auto& action : actions_) {
if (action.insert_data_ != nullptr) {
row_count += action.insert_data_->count_;
}
}
if (row_count == 0) {
return Status(DB_EMPTY_COLLECTION, "All entities deleted");
}
return Status::OK();
}

View File

@ -58,15 +58,20 @@ class MemSegment {
}
Status
Serialize();
Serialize(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snapshot::MultiSegmentsOperation>& operation);
idx_t
GetMaxOpID() const {
return max_op_id_;
}
private:
Status
CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snapshot::NewSegmentOperation>& operation,
segment::SegmentWriterPtr& writer, idx_t max_op_id);
CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snapshot::MultiSegmentsOperation>& operation,
segment::SegmentWriterPtr& writer);
Status
ApplyDeleteToMem();
ApplyDeleteToMem(snapshot::ScopedSnapshotT& ss);
Status
PutChunksToWriter(const segment::SegmentWriterPtr& writer);
@ -82,6 +87,8 @@ class MemSegment {
ActionArray actions_; // the actions array mekesure insert/delete actions executed one by one
int64_t total_row_count_ = 0;
idx_t max_op_id_ = 0;
};
using MemSegmentPtr = std::shared_ptr<MemSegment>;

View File

@ -96,7 +96,7 @@ SegmentReader::Load() {
STATUS_CHECK(LoadBloomFilter(id_bloom_filter_ptr));
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(LoadDeletedDocs(deleted_docs_ptr));
LoadDeletedDocs(deleted_docs_ptr);
STATUS_CHECK(LoadVectorIndice());
@ -276,7 +276,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
// load deleted doc
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = std::make_shared<faiss::ConcurrentBitset>(uids.size());
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(LoadDeletedDocs(deleted_docs_ptr));
LoadDeletedDocs(deleted_docs_ptr);
if (deleted_docs_ptr != nullptr) {
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();
for (auto& offset : deleted_docs) {
@ -479,7 +479,7 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
if (!std::experimental::filesystem::exists(file_path + codec::IdBloomFilterFormat::FilePostfix())) {
return Status::OK(); // file doesn't exist
return Status(DB_ERROR, "File doesn't exist"); // file doesn't exist
}
// if the data is in cache, no need to read file
@ -518,7 +518,7 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
if (!std::experimental::filesystem::exists(file_path + codec::DeletedDocsFormat::FilePostfix())) {
return Status::OK(); // file doesn't exist
return Status(DB_ERROR, "File doesn't exist"); // file doesn't exist
}
// if the data is in cache, no need to read file