From aa31b0c2517dd418b06267a287524f6785ae6ea8 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Tue, 16 Jun 2020 15:05:55 +0800 Subject: [PATCH] (db/snapshot): fix bugs and enhance unit test (#2562) * (db/snapshot): add ToString for Snapshot Signed-off-by: peng.xu * (db/snapshot): fix bugs and enhance CompoundTest1 Signed-off-by: peng.xu * (db/snapshot): fix bug in operations Signed-off-by: peng.xu * (db/snapshot): some code refactor Signed-off-by: peng.xu * (db/snapshot): small change Signed-off-by: peng.xu --- core/src/db/snapshot/CompoundOperations.cpp | 68 +++++--- core/src/db/snapshot/CompoundOperations.h | 9 +- core/src/db/snapshot/Operations.cpp | 4 +- core/src/db/snapshot/Operations.h | 11 +- core/src/db/snapshot/ResourceOperations.cpp | 25 +-- core/src/db/snapshot/Snapshot.cpp | 77 ++++++++- core/src/db/snapshot/Snapshot.h | 15 +- core/unittest/db/test_snapshot.cpp | 178 +++++++++++++++++--- 8 files changed, 310 insertions(+), 77 deletions(-) diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index b4f7b3ab74..ad0be26bbf 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -29,14 +29,14 @@ BuildOperation::DoExecute(Store& store) { if (!status.ok()) return status; - SegmentCommitOperation op(context_, context_.prev_ss); + SegmentCommitOperation op(context_, GetAdjustedSS()); op(store); status = op.GetResource(context_.new_segment_commit); if (!status.ok()) return status; AddStepWithLsn(*context_.new_segment_commit, context_.lsn); - PartitionCommitOperation pc_op(context_, context_.prev_ss); + PartitionCommitOperation pc_op(context_, GetAdjustedSS()); pc_op(store); OperationContext cc_context; @@ -52,7 +52,7 @@ BuildOperation::DoExecute(Store& store) { return status; AddStepWithLsn(*context_.new_partition_commit, context_.lsn); - CollectionCommitOperation cc_op(cc_context, context_.prev_ss); + CollectionCommitOperation cc_op(cc_context, GetAdjustedSS()); cc_op(store); status = cc_op.GetResource(context_.new_collection_commit); if (!status.ok()) @@ -77,7 +77,13 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id)); if (!status.ok()) return status; - auto new_sf_op = std::make_shared(context, prev_ss_); + auto segment = GetStartedSS()->GetResource(context.segment_id); + if (!segment) { + return Status(SS_INVALID_CONTEX_ERROR, "Invalid segment_id in context"); + } + auto ctx = context; + ctx.partition_id = segment->GetPartitionId(); + auto new_sf_op = std::make_shared(ctx, GetStartedSS()); status = new_sf_op->Push(); if (!status.ok()) return status; @@ -101,7 +107,7 @@ NewSegmentOperation::DoExecute(Store& store) { /* auto status = PrevSnapshotRequried(); */ /* if (!status.ok()) return status; */ // TODO: Check Context - SegmentCommitOperation op(context_, context_.prev_ss); + SegmentCommitOperation op(context_, GetAdjustedSS()); auto status = op(store); if (!status.ok()) return status; @@ -117,7 +123,7 @@ NewSegmentOperation::DoExecute(Store& store) { OperationContext cc_context; - PartitionCommitOperation pc_op(context_, context_.prev_ss); + PartitionCommitOperation pc_op(context_, GetAdjustedSS()); status = pc_op(store); if (!status.ok()) return status; @@ -132,7 +138,7 @@ NewSegmentOperation::DoExecute(Store& store) { /* } */ /* std::cout << ")" << std::endl; */ - CollectionCommitOperation cc_op(cc_context, context_.prev_ss); + CollectionCommitOperation cc_op(cc_context, GetAdjustedSS()); status = cc_op(store); if (!status.ok()) return status; @@ -146,7 +152,7 @@ NewSegmentOperation::DoExecute(Store& store) { Status NewSegmentOperation::CommitNewSegment(SegmentPtr& created) { - auto op = std::make_shared(context_, prev_ss_); + auto op = std::make_shared(context_, GetStartedSS()); auto status = op->Push(); if (!status.ok()) return status; @@ -163,7 +169,7 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg auto c = context; c.segment_id = context_.new_segment->GetID(); c.partition_id = context_.new_segment->GetPartitionId(); - auto new_sf_op = std::make_shared(c, prev_ss_); + auto new_sf_op = std::make_shared(c, GetStartedSS()); auto status = new_sf_op->Push(); if (!status.ok()) return status; @@ -178,6 +184,18 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg MergeOperation::MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) { } +Status +MergeOperation::OnSnapshotStale() { + for (auto& stale_seg : context_.stale_segments) { + auto expect_sc = GetStartedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID()); + auto latest_sc = GetAdjustedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID()); + if (!latest_sc || (latest_sc->GetID() != expect_sc->GetID())) { + return Status(SS_STALE_ERROR, "MergeOperation on stale segments"); + } + } + return Status::OK(); +} + Status MergeOperation::CommitNewSegment(SegmentPtr& created) { Status status; @@ -185,7 +203,7 @@ MergeOperation::CommitNewSegment(SegmentPtr& created) { created = context_.new_segment; return status; } - auto op = std::make_shared(context_, prev_ss_); + auto op = std::make_shared(context_, GetStartedSS()); status = op->Push(); if (!status.ok()) return status; @@ -207,7 +225,7 @@ MergeOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF auto c = context; c.segment_id = new_segment->GetID(); c.partition_id = new_segment->GetPartitionId(); - auto new_sf_op = std::make_shared(c, prev_ss_); + auto new_sf_op = std::make_shared(c, GetStartedSS()); status = new_sf_op->Push(); if (!status.ok()) return status; @@ -224,7 +242,7 @@ MergeOperation::DoExecute(Store& store) { // PXU TODO: // 1. Check all requried field elements have related segment files // 2. Check Stale and others - SegmentCommitOperation op(context_, context_.prev_ss); + SegmentCommitOperation op(context_, GetAdjustedSS()); auto status = op(store); if (!status.ok()) return status; @@ -239,7 +257,7 @@ MergeOperation::DoExecute(Store& store) { /* } */ /* std::cout << ")" << std::endl; */ - PartitionCommitOperation pc_op(context_, context_.prev_ss); + PartitionCommitOperation pc_op(context_, GetAdjustedSS()); status = pc_op(store); if (!status.ok()) return status; @@ -257,7 +275,7 @@ MergeOperation::DoExecute(Store& store) { /* } */ /* std::cout << ")" << std::endl; */ - CollectionCommitOperation cc_op(cc_context, context_.prev_ss); + CollectionCommitOperation cc_op(cc_context, GetAdjustedSS()); status = cc_op(store); if (!status.ok()) return status; @@ -309,8 +327,8 @@ std::string DropPartitionOperation::GetRepr() const { std::stringstream ss; ss << "<" << GetName() << "("; - if (prev_ss_) { - ss << "SS=" << prev_ss_->GetID(); + if (GetAdjustedSS()) { + ss << "SS=" << GetAdjustedSS()->GetID(); } ss << "," << c_context_.ToString(); ss << "," << context_.ToString(); @@ -325,19 +343,19 @@ DropPartitionOperation::DoExecute(Store& store) { PartitionPtr p; auto id = c_context_.id; if (id == 0) { - status = prev_ss_->GetPartitionId(c_context_.name, id); + status = GetAdjustedSS()->GetPartitionId(c_context_.name, id); c_context_.id = id; } if (!status.ok()) return status; - auto p_c = prev_ss_->GetPartitionCommitByPartitionId(id); + auto p_c = GetAdjustedSS()->GetPartitionCommitByPartitionId(id); if (!p_c) return Status(SS_NOT_FOUND_ERROR, "No partition commit found"); context_.stale_partition_commit = p_c; OperationContext op_ctx; op_ctx.stale_partition_commit = p_c; - auto op = CollectionCommitOperation(op_ctx, prev_ss_); + auto op = CollectionCommitOperation(op_ctx, GetAdjustedSS()); status = op(store); if (!status.ok()) return status; @@ -368,7 +386,7 @@ CreatePartitionOperation::PreCheck() { Status CreatePartitionOperation::CommitNewPartition(const PartitionContext& context, PartitionPtr& partition) { Status status; - auto op = std::make_shared(context, prev_ss_); + auto op = std::make_shared(context, GetStartedSS()); status = op->Push(); if (!status.ok()) return status; @@ -387,13 +405,13 @@ CreatePartitionOperation::DoExecute(Store& store) { if (!status.ok()) return status; - auto collection = prev_ss_->GetCollection(); + auto collection = GetAdjustedSS()->GetCollection(); auto partition = context_.new_partition; PartitionCommitPtr pc; OperationContext pc_context; pc_context.new_partition = partition; - auto pc_op = PartitionCommitOperation(pc_context, prev_ss_); + auto pc_op = PartitionCommitOperation(pc_context, GetAdjustedSS()); status = pc_op(store); if (!status.ok()) return status; @@ -404,7 +422,7 @@ CreatePartitionOperation::DoExecute(Store& store) { OperationContext cc_context; cc_context.new_partition_commit = pc; context_.new_partition_commit = pc; - auto cc_op = CollectionCommitOperation(cc_context, prev_ss_); + auto cc_op = CollectionCommitOperation(cc_context, GetAdjustedSS()); status = cc_op(store); if (!status.ok()) return status; @@ -432,8 +450,8 @@ std::string CreateCollectionOperation::GetRepr() const { std::stringstream ss; ss << "<" << GetName() << "("; - if (prev_ss_) { - ss << "SS=" << prev_ss_->GetID(); + if (GetAdjustedSS()) { + ss << "SS=" << GetAdjustedSS()->GetID(); } ss << c_context_.ToString(); ss << "," << context_.ToString(); diff --git a/core/src/db/snapshot/CompoundOperations.h b/core/src/db/snapshot/CompoundOperations.h index ed99618fe3..0aa8b5bea6 100644 --- a/core/src/db/snapshot/CompoundOperations.h +++ b/core/src/db/snapshot/CompoundOperations.h @@ -32,8 +32,8 @@ class CompoundBaseOperation : public Operations { GetRepr() const override { std::stringstream ss; ss << "<" << GetName() << "("; - if (context_.prev_ss) { - ss << "SS=" << context_.prev_ss->GetID(); + if (GetAdjustedSS()) { + ss << "SS=" << GetAdjustedSS()->GetID(); } ss << "," << context_.ToString(); ss << ",LSN=" << GetContextLsn(); @@ -43,7 +43,7 @@ class CompoundBaseOperation : public Operations { Status PreCheck() override { - if (GetContextLsn() <= prev_ss_->GetMaxLsn()) { + if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) { return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation"); } return Status::OK(); @@ -104,6 +104,9 @@ class MergeOperation : public CompoundBaseOperation { CommitNewSegment(SegmentPtr&); Status CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr&); + + Status + OnSnapshotStale() override; }; class CreateCollectionOperation : public CompoundBaseOperation { diff --git a/core/src/db/snapshot/Operations.cpp b/core/src/db/snapshot/Operations.cpp index 85fab0d4f4..b9589377a3 100644 --- a/core/src/db/snapshot/Operations.cpp +++ b/core/src/db/snapshot/Operations.cpp @@ -225,8 +225,8 @@ Operations::OnExecute(Store& store) { Status Operations::PreExecute(Store& store) { Status status; - if (prev_ss_ && type_ == OperationsType::W_Compound) { - Snapshots::GetInstance().GetSnapshot(context_.prev_ss, prev_ss_->GetCollectionId()); + if (GetStartedSS() && type_ == OperationsType::W_Compound) { + Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId()); if (!context_.prev_ss) { status = OnSnapshotDropped(); } else if (prev_ss_->GetID() != context_.prev_ss->GetID()) { diff --git a/core/src/db/snapshot/Operations.h b/core/src/db/snapshot/Operations.h index 9781906c3c..7a6868b6c9 100644 --- a/core/src/db/snapshot/Operations.h +++ b/core/src/db/snapshot/Operations.h @@ -41,10 +41,15 @@ class Operations : public std::enable_shared_from_this { const OperationsType& type = OperationsType::Invalid); const ScopedSnapshotT& - GetPrevSnapshot() const { + GetStartedSS() const { return prev_ss_; } + const ScopedSnapshotT& + GetAdjustedSS() const { + return context_.prev_ss; + } + virtual const LSN_TYPE& GetContextLsn() const { return context_.lsn; @@ -201,6 +206,8 @@ class CommitOperation : public Operations { Status GetResource(typename ResourceT::Ptr& res, bool wait = false) { + if (!status_.ok()) + return status_; if (wait) { WaitToFinish(); } @@ -248,6 +255,8 @@ class LoadOperation : public Operations { Status GetResource(typename ResourceT::Ptr& res, bool wait = false) { + if (!status_.ok()) + return status_; if (wait) { WaitToFinish(); } diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index df7fca3961..d967a4a2c1 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -27,7 +27,7 @@ CollectionCommitOperation::DoExecute(Store& store) { resource_->GetMappings().erase(context_.stale_partition_commit->GetID()); } else if (context_.new_partition_commit) { auto prev_partition_commit = - prev_ss_->GetPartitionCommitByPartitionId(context_.new_partition_commit->GetPartitionId()); + GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_partition_commit->GetPartitionId()); if (prev_partition_commit) resource_->GetMappings().erase(prev_partition_commit->GetID()); resource_->GetMappings().insert(context_.new_partition_commit->GetID()); @@ -53,7 +53,7 @@ PartitionOperation::DoExecute(Store& store) { auto status = CheckStale(); if (!status.ok()) return status; - resource_ = std::make_shared(context_.name, prev_ss_->GetCollection()->GetID()); + resource_ = std::make_shared(context_.name, GetStartedSS()->GetCollection()->GetID()); AddStep(*resource_, false); return status; } @@ -72,7 +72,7 @@ PartitionCommitOperation::GetPrevResource() const { auto& segment_commit = context_.new_segment_commit; if (!segment_commit) return nullptr; - return prev_ss_->GetPartitionCommitByPartitionId(segment_commit->GetPartitionId()); + return GetStartedSS()->GetPartitionCommitByPartitionId(segment_commit->GetPartitionId()); } Status @@ -82,12 +82,16 @@ PartitionCommitOperation::DoExecute(Store& store) { resource_ = std::make_shared(*prev_resource); resource_->SetID(0); resource_->ResetStatus(); - auto prev_segment_commit = prev_ss_->GetSegmentCommit(context_.new_segment_commit->GetSegmentId()); + auto prev_segment_commit = + GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_commit->GetSegmentId()); if (prev_segment_commit) resource_->GetMappings().erase(prev_segment_commit->GetID()); if (context_.stale_segments.size() > 0) { for (auto& stale_segment : context_.stale_segments) { - auto stale_segment_commit = prev_ss_->GetSegmentCommit(stale_segment->GetID()); + if (stale_segment->GetPartitionId() != prev_resource->GetPartitionId()) { + return Status(SS_INVALID_CONTEX_ERROR, "All stale segments should from specified partition"); + } + auto stale_segment_commit = GetStartedSS()->GetSegmentCommitBySegmentId(stale_segment->GetID()); resource_->GetMappings().erase(stale_segment_commit->GetID()); } } @@ -95,7 +99,8 @@ PartitionCommitOperation::DoExecute(Store& store) { if (!context_.new_partition) { return Status(SS_INVALID_CONTEX_ERROR, "Partition is required"); } - resource_ = std::make_shared(prev_ss_->GetCollectionId(), context_.new_partition->GetID()); + resource_ = + std::make_shared(GetStartedSS()->GetCollectionId(), context_.new_partition->GetID()); } if (context_.new_segment_commit) { @@ -112,7 +117,7 @@ SegmentCommitOperation::SegmentCommitOperation(const OperationContext& context, SegmentCommit::Ptr SegmentCommitOperation::GetPrevResource() const { if (context_.new_segment_files.size() > 0) { - return prev_ss_->GetSegmentCommit(context_.new_segment_files[0]->GetSegmentId()); + return GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_files[0]->GetSegmentId()); } return nullptr; } @@ -132,7 +137,7 @@ SegmentOperation::DoExecute(Store& store) { if (!context_.prev_partition) { return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentOperation Context"); } - auto prev_num = prev_ss_->GetMaxSegmentNumByPartition(context_.prev_partition->GetID()); + auto prev_num = GetStartedSS()->GetMaxSegmentNumByPartition(context_.prev_partition->GetID()); resource_ = std::make_shared(context_.prev_partition->GetID(), prev_num + 1); AddStep(*resource_, false); return Status::OK(); @@ -150,7 +155,7 @@ SegmentCommitOperation::DoExecute(Store& store) { resource_->GetMappings().erase(context_.stale_segment_file->GetID()); } } else { - resource_ = std::make_shared(prev_ss_->GetLatestSchemaCommitId(), + resource_ = std::make_shared(GetStartedSS()->GetLatestSchemaCommitId(), context_.new_segment_files[0]->GetPartitionId(), context_.new_segment_files[0]->GetSegmentId()); } @@ -175,7 +180,7 @@ SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedS Status SegmentFileOperation::DoExecute(Store& store) { - auto field_element_id = prev_ss_->GetFieldElementId(context_.field_name, context_.field_element_name); + auto field_element_id = GetStartedSS()->GetFieldElementId(context_.field_name, context_.field_element_name); resource_ = std::make_shared(context_.partition_id, context_.segment_id, field_element_id); AddStep(*resource_, false); return Status::OK(); diff --git a/core/src/db/snapshot/Snapshot.cpp b/core/src/db/snapshot/Snapshot.cpp index 942bbaf43b..dbc83952ee 100644 --- a/core/src/db/snapshot/Snapshot.cpp +++ b/core/src/db/snapshot/Snapshot.cpp @@ -118,18 +118,77 @@ Snapshot::Snapshot(ID_TYPE id) { } } - /* for(auto kv : partition_commits_) { */ - /* std::cout << this << " Snapshot " << collection_commit_->GetID() << " PartitionCommit " << */ - /* kv.first << " Partition " << kv.second->GetPartitionId() << std::endl; */ - /* } */ - /* for(auto kv : p_pc_map_) { */ - /* std::cout << this << " Snapshot " << collection_commit_->GetID() << " P " << */ - /* kv.first << " PC " << kv.second << std::endl; */ - /* } */ - RefAll(); } +const std::string +Snapshot::ToString() { + auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string { + std::stringstream ss; + std::string l1_spaces; + for (auto i = 0; i < ident; ++i) { + l1_spaces += " "; + } + auto l2_spaces = l1_spaces + l1_spaces; + std::string prefix = ""; + if (mappings.size() > line_length) { + prefix = "\n" + l1_spaces; + } + ss << prefix << "["; + auto pos = 0; + for (auto id : mappings) { + if (pos > line_length) { + pos = 0; + ss << "\n" << l2_spaces; + } else if (pos == 0) { + if (prefix != "") { + ss << "\n" << l2_spaces; + } + } else { + ss << ", "; + } + ss << id; + pos++; + } + ss << prefix << "]"; + return ss.str(); + }; + + int row_element_size = 8; + std::stringstream ss; + ss << "****************************** Snapshot " << GetID() << " ******************************"; + ss << "\nCollection: id=" << GetCollectionId() << ",name=\"" << GetName() << "\""; + ss << ", CollectionCommit: id=" << GetCollectionCommit()->GetID(); + ss << ",mappings="; + auto& cc_m = GetCollectionCommit()->GetMappings(); + ss << to_matrix_string(cc_m, row_element_size, 2); + for (auto& p_c_id : cc_m) { + auto p_c = GetResource(p_c_id); + auto p = GetResource(p_c->GetPartitionId()); + ss << "\nPartition: id=" << p->GetID() << ",name=\"" << p->GetName() << "\""; + ss << ", PartitionCommit: id=" << p_c->GetID(); + ss << ",mappings="; + auto& pc_m = p_c->GetMappings(); + ss << to_matrix_string(pc_m, row_element_size, 2); + for (auto& sc_id : pc_m) { + auto sc = GetResource(sc_id); + auto se = GetResource(sc->GetSegmentId()); + ss << "\n Segment: id=" << se->GetID(); + ss << ", SegmentCommit: id=" << sc->GetID(); + ss << ",mappings="; + auto& sc_m = sc->GetMappings(); + ss << to_matrix_string(sc_m, row_element_size, 2); + for (auto& sf_id : sc_m) { + auto sf = GetResource(sf_id); + ss << "\n\tSegmentFile: id=" << sf_id << ",field_element_id=" << sf->GetFieldElementId(); + } + } + } + ss << "\n----------------------------------------------------------------------------------------"; + + return ss.str(); +} + } // namespace snapshot } // namespace engine } // namespace milvus diff --git a/core/src/db/snapshot/Snapshot.h b/core/src/db/snapshot/Snapshot.h index a20b704762..fd6ea7e46a 100644 --- a/core/src/db/snapshot/Snapshot.h +++ b/core/src/db/snapshot/Snapshot.h @@ -82,6 +82,16 @@ class Snapshot : public ReferenceProxy { return max_lsn_; } + PartitionPtr + GetPartition(const std::string& name) { + ID_TYPE id; + auto status = GetPartitionId(name, id); + if (!status.ok()) { + return nullptr; + } + return GetResource(id); + } + Status GetPartitionId(const std::string& name, ID_TYPE& id) const { auto it = partition_names_map_.find(name); @@ -104,7 +114,7 @@ class Snapshot : public ReferenceProxy { // PXU TODO: add const. Need to change Scopedxxxx::Get SegmentCommitPtr - GetSegmentCommit(ID_TYPE segment_id) { + GetSegmentCommitBySegmentId(ID_TYPE segment_id) { auto it = seg_segc_map_.find(segment_id); if (it == seg_segc_map_.end()) return nullptr; @@ -246,6 +256,9 @@ class Snapshot : public ReferenceProxy { resources[resource->GetID()] = resource; } + const std::string + ToString(); + private: Snapshot(const Snapshot&) = delete; Snapshot& diff --git a/core/unittest/db/test_snapshot.cpp b/core/unittest/db/test_snapshot.cpp index cfd8e11527..17fbce3cfa 100644 --- a/core/unittest/db/test_snapshot.cpp +++ b/core/unittest/db/test_snapshot.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include "db/utils.h" #include "db/snapshot/ReferenceProxy.h" @@ -186,7 +187,6 @@ CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) { op->Push(); ScopedSnapshotT ss; auto status = op->GetSnapshot(ss); - std::cout << status.ToString() << std::endl; return ss; } @@ -234,7 +234,6 @@ TEST_F(SnapshotTest, DropCollectionTest) { ASSERT_TRUE(ss); ScopedSnapshotT lss; auto status = Snapshots::GetInstance().GetSnapshot(lss, collection_name); - std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); ASSERT_TRUE(lss); ASSERT_EQ(ss->GetID(), lss->GetID()); @@ -316,6 +315,7 @@ CreatePartition(const std::string& collection_name, const PartitionContext& p_co ScopedSnapshotT ss; auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name); if (!status.ok()) { + std::cout << status.ToString() << std::endl; return curr_ss; } @@ -326,15 +326,21 @@ CreatePartition(const std::string& collection_name, const PartitionContext& p_co PartitionPtr partition; status = op->CommitNewPartition(p_context, partition); if (!status.ok()) { + std::cout << status.ToString() << std::endl; return curr_ss; } status = op->Push(); if (!status.ok()) { + std::cout << status.ToString() << std::endl; return curr_ss; } status = op->GetSnapshot(curr_ss); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + return curr_ss; + } return curr_ss; } @@ -387,8 +393,8 @@ TEST_F(SnapshotTest, PartitionTest) { p_ctx.lsn = ++lsn; drop_op = std::make_shared(p_ctx, latest_ss); status = drop_op->Push(); - ASSERT_TRUE(!status.ok()); std::cout << status.ToString() << std::endl; + ASSERT_TRUE(!status.ok()); PartitionContext pp_ctx; pp_ctx.name = "p2"; @@ -495,7 +501,7 @@ TEST_F(SnapshotTest, OperationTest) { status = build_op->CommitNewSegmentFile(sf_context, seg_file); ASSERT_TRUE(status.ok()); ASSERT_TRUE(seg_file); - auto prev_segment_commit = ss->GetSegmentCommit(seg_file->GetSegmentId()); + auto prev_segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId()); auto prev_segment_commit_mappings = prev_segment_commit->GetMappings(); ASSERT_NE(prev_segment_commit->ToString(), ""); @@ -503,7 +509,7 @@ TEST_F(SnapshotTest, OperationTest) { status = build_op->GetSnapshot(ss); ASSERT_TRUE(ss->GetID() > ss_id); - auto segment_commit = ss->GetSegmentCommit(seg_file->GetSegmentId()); + auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId()); auto segment_commit_mappings = segment_commit->GetMappings(); MappingT expected_mappings = prev_segment_commit_mappings; expected_mappings.insert(seg_file->GetID()); @@ -515,6 +521,7 @@ TEST_F(SnapshotTest, OperationTest) { stale_segment_commit_ids.insert(segment_commit->GetID()); } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); // Check stale snapshot has been deleted from store { auto collection_commit = CollectionCommitsHolder::GetInstance() @@ -543,7 +550,7 @@ TEST_F(SnapshotTest, OperationTest) { ASSERT_TRUE(ss->GetID() > ss_id); ASSERT_TRUE(status.ok()); - auto segment_commit = ss->GetSegmentCommit(seg_file->GetSegmentId()); + auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId()); auto segment_commit_mappings = segment_commit->GetMappings(); MappingT expected_segment_mappings; expected_segment_mappings.insert(seg_file->GetID()); @@ -579,7 +586,7 @@ TEST_F(SnapshotTest, OperationTest) { ASSERT_TRUE(ss->GetID() > ss_id); ASSERT_TRUE(status.ok()); - auto segment_commit = ss->GetSegmentCommit(new_seg->GetID()); + auto segment_commit = ss->GetSegmentCommitBySegmentId(new_seg->GetID()); auto new_partition_commit = ss->GetPartitionCommitByPartitionId(partition_id); auto new_mappings = new_partition_commit->GetMappings(); auto prev_mappings = prev_partition_commit->GetMappings(); @@ -628,6 +635,7 @@ TEST_F(SnapshotTest, OperationTest) { ++lsn); ASSERT_TRUE(status.ok()); status = build_op->Push(); + std::cout << status.ToString() << std::endl; ASSERT_TRUE(!status.ok()); ASSERT_TRUE(!(build_op->GetStatus()).ok()); std::cout << build_op->ToString() << std::endl; @@ -665,6 +673,10 @@ TEST_F(SnapshotTest, CompoundTest1) { auto next_lsn = [&]() -> decltype(lsn) { return ++lsn; }; + LSN_TYPE pid = 0; + auto next_pid = [&]() -> decltype(pid) { + return ++pid; + }; std::string collection_name("c1"); auto ss = CreateCollection(collection_name, next_lsn()); ASSERT_TRUE(ss); @@ -675,12 +687,15 @@ TEST_F(SnapshotTest, CompoundTest1) { std::set all_segments; std::set segment_in_building; - std::set merge_segs; - std::map> merged_segs; + std::map> merged_segs_history; + std::set merged_segs; std::set built_segs; + std::set build_stale_segs; std::mutex all_mtx; - std::mutex building_mtx; + std::mutex merge_mtx; + std::mutex built_mtx; + std::mutex partition_mtx; WaitableObj merge_waiter; WaitableObj build_waiter; @@ -691,6 +706,8 @@ TEST_F(SnapshotTest, CompoundTest1) { sf_context.segment_id = 1; sf_context.partition_id = 1; + IDS_TYPE partitions = {ss->GetResources().begin()->second->GetID()}; + auto do_build = [&] (const ID_TYPE& seg_id) { decltype(ss) latest_ss; auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name); @@ -704,11 +721,26 @@ TEST_F(SnapshotTest, CompoundTest1) { SegmentFilePtr seg_file; build_sf_context.segment_id = seg_id; status = build_op->CommitNewSegmentFile(build_sf_context, seg_file); - ASSERT_TRUE(status.ok()); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + std::unique_lock lock(merge_mtx); + auto it = merged_segs.find(seg_id); + ASSERT_NE(it, merged_segs.end()); + return; + } + std::unique_lock lock(built_mtx); status = build_op->Push(); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + std::unique_lock lock(merge_mtx); + auto it = merged_segs.find(seg_id); + ASSERT_NE(it, merged_segs.end()); + return; + } ASSERT_TRUE(status.ok()); status = build_op->GetSnapshot(latest_ss); ASSERT_TRUE(status.ok()); + built_segs.insert(seg_id); }; @@ -720,6 +752,7 @@ TEST_F(SnapshotTest, CompoundTest1) { auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name); ASSERT_TRUE(status.ok()); + PartitionPtr partition; OperationContext context; for (auto& id : seg_ids) { auto seg = latest_ss->GetResource(id); @@ -727,6 +760,12 @@ TEST_F(SnapshotTest, CompoundTest1) { std::cout << "Error seg=" << id << std::endl; ASSERT_TRUE(seg); } + if (!partition) { + partition = latest_ss->GetResource(seg->GetPartitionId()); + ASSERT_TRUE(partition); + } else { + ASSERT_EQ(seg->GetPartitionId(), partition->GetID()); + } context.stale_segments.push_back(seg); if (!context.prev_partition) { context.prev_partition = latest_ss->GetResource( @@ -743,29 +782,64 @@ TEST_F(SnapshotTest, CompoundTest1) { SegmentFilePtr seg_file; status = op->CommitNewSegmentFile(sf_context, seg_file); ASSERT_TRUE(status.ok()); + std::unique_lock lock(merge_mtx); status = op->Push(); - ASSERT_TRUE(status.ok()); + if (!status.ok()) { + lock.unlock(); + std::unique_lock blk(built_mtx); + std::cout << status.ToString() << std::endl; + /* for (auto id : built_segs) { */ + /* std::cout << "builted " << id << std::endl; */ + /* } */ + /* for (auto id : seg_ids) { */ + /* std::cout << "to_merge " << id << std::endl; */ + /* } */ + bool stale_found = false; + for (auto& seg_id : seg_ids) { + auto it = built_segs.find(seg_id); + if (it != built_segs.end()) { + stale_found = true; + break; + } + } + ASSERT_TRUE(stale_found); + return; + } ID_TYPE ss_id = latest_ss->GetID(); status = op->GetSnapshot(latest_ss); ASSERT_TRUE(status.ok()); ASSERT_TRUE(latest_ss->GetID() > ss_id); - merged_segs[new_seg->GetID()] = seg_ids; + + merged_segs_history[new_seg->GetID()] = seg_ids; + for (auto& seg_id : seg_ids) { + merged_segs.insert(seg_id); + } new_seg_id = new_seg->GetID(); + ASSERT_EQ(new_seg->GetPartitionId(), partition->GetID()); }; // TODO: If any Compound Operation find larger Snapshot. This Operation should be rollback to latest auto handler_worker = [&] { - auto to_build_segments = RandomInt(50, 60); + auto loop_cnt = RandomInt(10, 20); decltype(ss) latest_ss; - for (auto i = 0; i < to_build_segments; ++i) { + auto create_new_segment = [&]() { + ID_TYPE partition_id; + { + std::unique_lock lock(partition_mtx); + auto idx = RandomInt(0, partitions.size() - 1); + partition_id = partitions[idx]; + } Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name); OperationContext context; + context.prev_partition = latest_ss->GetResource(partition_id); context.lsn = next_lsn(); - context.prev_partition = latest_ss->GetResource(8); auto op = std::make_shared(context, latest_ss); SegmentPtr new_seg; status = op->CommitNewSegment(new_seg); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + } ASSERT_TRUE(status.ok()); SegmentFilePtr seg_file; sf_context.segment_id = new_seg->GetID(); @@ -778,10 +852,39 @@ TEST_F(SnapshotTest, CompoundTest1) { std::unique_lock lock(all_mtx); all_segments.insert(new_seg->GetID()); } + if (RandomInt(0, 10) >= 7) { + build_queue.Put(new_seg->GetID()); + } merge_queue.Put(new_seg->GetID()); + }; + + auto create_partition = [&]() { + std::stringstream ss; + ss << "fake_partition_" << next_pid(); + PartitionContext context; + context.name = ss.str(); + std::unique_lock lock(partition_mtx); + auto latest_ss = CreatePartition(collection_name, context, next_lsn()); + ASSERT_TRUE(latest_ss); + auto partition = latest_ss->GetPartition(ss.str()); + partitions.push_back(partition->GetID()); + if (latest_ss->NumberOfPartitions() != partitions.size()) { + for (auto& pid : partitions) { + std::cout << "PartitionId=" << pid << std::endl; + } + } + ASSERT_EQ(latest_ss->NumberOfPartitions(), partitions.size()); + }; + + for (auto i = 0; i < loop_cnt; ++i) { + if (RandomInt(0, 10) > 7) { + create_partition(); + } + create_new_segment(); } }; + std::map> merge_segs; auto merge_worker = [&] { while (true) { auto seg_id = merge_queue.Take(); @@ -789,18 +892,36 @@ TEST_F(SnapshotTest, CompoundTest1) { std::cout << "Exiting Merge Worker" << std::endl; break; } - merge_segs.insert(seg_id); - if ((merge_segs.size() >= 2) && (RandomInt(0, 10) >= 5)) { - std::cout << "Merging ("; - for (auto seg : merge_segs) { + decltype(ss) latest_ss; + auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name); + ASSERT_TRUE(status.ok()); + auto seg = latest_ss->GetResource(seg_id); + if (!seg) { + std::cout << "SegID=" << seg_id << std::endl; + std::cout << latest_ss->ToString() << std::endl; + ASSERT_TRUE(seg); + } + + auto it_segs = merge_segs.find(seg->GetPartitionId()); + if (it_segs == merge_segs.end()) { + merge_segs[seg->GetPartitionId()] = {seg->GetID()}; + } else { + merge_segs[seg->GetPartitionId()].insert(seg->GetID()); + } + auto& segs = merge_segs[seg->GetPartitionId()]; + if ((segs.size() >= 2) && (RandomInt(0, 10) >= 2)) { + std::cout << "Merging partition " << seg->GetPartitionId() << " segs ("; + for (auto seg : segs) { std::cout << seg << ","; } std::cout << ")" << std::endl; ID_TYPE new_seg_id = 0; - do_merge(merge_segs, new_seg_id); - merge_segs.clear(); - ASSERT_NE(new_seg_id, 0); - if (RandomInt(0, 10) >= 5) { + do_merge(segs, new_seg_id); + segs.clear(); + if (new_seg_id == 0) { + continue; + } + if (RandomInt(0, 10) >= 6) { build_queue.Put(new_seg_id); } @@ -842,7 +963,7 @@ TEST_F(SnapshotTest, CompoundTest1) { t3.join(); t4.join(); - /* for (auto& kv : merged_segs) { */ + /* for (auto& kv : merged_segs_history) { */ /* std::cout << "merged: ("; */ /* for (auto i : kv.second) { */ /* std::cout << i << ","; */ @@ -860,7 +981,7 @@ TEST_F(SnapshotTest, CompoundTest1) { status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name); ASSERT_TRUE(status.ok()); auto expect_segments = all_segments; - for (auto& kv : merged_segs) { + for (auto& kv : merged_segs_history) { expect_segments.insert(kv.first); for (auto& id : kv.second) { expect_segments.erase(id); @@ -878,5 +999,10 @@ TEST_F(SnapshotTest, CompoundTest1) { decltype(final_segment_file_cnt) expect_segment_file_cnt; expect_segment_file_cnt = expect_segments.size(); expect_segment_file_cnt += built_segs.size(); + std::cout << latest_ss->ToString() << std::endl; + std::vector common_ids; + std::set_intersection(merged_segs.begin(), merged_segs.end(), built_segs.begin(), built_segs.end(), + std::back_inserter(common_ids)); + expect_segment_file_cnt -= common_ids.size(); ASSERT_EQ(expect_segment_file_cnt, final_segment_file_cnt); }