From 3b898420d23fdd953b11d3d1e53621a377708a2d Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Sat, 4 Jul 2020 12:09:21 +0800 Subject: [PATCH] (db/snapshot): Add DropAllIndexOperation and related UT (#2737) * (db/snapshot): add drop all index operation Signed-off-by: peng.xu * (db/snapshot): Add ut for drop all index operation Signed-off-by: peng.xu --- core/src/db/snapshot/CompoundOperations.cpp | 64 ++++++++++++++++ core/src/db/snapshot/CompoundOperations.h | 20 ++++- core/src/db/snapshot/Context.h | 3 + core/src/db/snapshot/ResourceOperations.cpp | 50 +++++++++---- core/src/db/snapshot/Snapshot.h | 2 +- core/unittest/ssdb/test_snapshot.cpp | 81 ++++++++++++++++++++- 6 files changed, 202 insertions(+), 18 deletions(-) diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index 4aa7e96603..0740c75129 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -10,8 +10,10 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/snapshot/CompoundOperations.h" +#include #include #include +#include #include "db/snapshot/OperationExecutor.h" #include "db/snapshot/Snapshots.h" #include "utils/Status.h" @@ -84,6 +86,68 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF return Status::OK(); } +DropAllIndexOperation::DropAllIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss) + : BaseT(context, prev_ss) { +} + +Status +DropAllIndexOperation::PreCheck() { + if (context_.stale_field_element == nullptr) { + std::stringstream emsg; + emsg << GetRepr() << ". Stale field element is requried"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); + } + + if (!GetStartedSS()->GetResource(context_.stale_field_element->GetID())) { + std::stringstream emsg; + emsg << GetRepr() << ". Specified field element " << context_.stale_field_element->GetName(); + emsg << " is stale"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); + } + // TODO: Check type + return Status::OK(); +} + +Status +DropAllIndexOperation::DoExecute(Store& store) { + auto& segment_files = GetAdjustedSS()->GetResources(); + + std::map> p_sc_map; + + for (auto& kv : segment_files) { + if (kv.second->GetFieldElementId() != context_.stale_field_element->GetID()) { + continue; + } + + auto context = context_; + context.stale_segment_file = kv.second.Get(); + SegmentCommitOperation sc_op(context, GetAdjustedSS()); + STATUS_CHECK(sc_op(store)); + STATUS_CHECK(sc_op.GetResource(context.new_segment_commit)); + AddStepWithLsn(*context.new_segment_commit, context.lsn); + p_sc_map[context.new_segment_commit->GetPartitionId()].push_back(context.new_segment_commit); + } + + OperationContext cc_context; + for (auto& kv : p_sc_map) { + auto& partition_id = kv.first; + auto context = context_; + context.new_segment_commits = kv.second; + PartitionCommitOperation pc_op(context, GetAdjustedSS()); + STATUS_CHECK(pc_op(store)); + STATUS_CHECK(pc_op.GetResource(context.new_partition_commit)); + AddStepWithLsn(*context.new_partition_commit, context.lsn); + cc_context.new_partition_commits.push_back(context.new_partition_commit); + } + + CollectionCommitOperation cc_op(cc_context, GetAdjustedSS()); + STATUS_CHECK(cc_op(store)); + STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit)); + AddStepWithLsn(*context_.new_collection_commit, context_.lsn); + + return Status::OK(); +} + DropIndexOperation::DropIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) { } diff --git a/core/src/db/snapshot/CompoundOperations.h b/core/src/db/snapshot/CompoundOperations.h index 7072e4695c..7c1c7a3a6b 100644 --- a/core/src/db/snapshot/CompoundOperations.h +++ b/core/src/db/snapshot/CompoundOperations.h @@ -74,10 +74,10 @@ class BuildOperation : public CompoundBaseOperation { CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const; }; -class DropIndexOperation : public CompoundBaseOperation { +class DropIndexOperation : public CompoundBaseOperation { public: - using BaseT = CompoundBaseOperation; - static constexpr const char* Name = "B"; + using BaseT = CompoundBaseOperation; + static constexpr const char* Name = "DI"; DropIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss); @@ -88,6 +88,20 @@ class DropIndexOperation : public CompoundBaseOperation { DoExecute(Store&) override; }; +class DropAllIndexOperation : public CompoundBaseOperation { + public: + using BaseT = CompoundBaseOperation; + static constexpr const char* Name = "DAI"; + + DropAllIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss); + + Status + PreCheck() override; + + Status + DoExecute(Store&) override; +}; + class NewSegmentOperation : public CompoundBaseOperation { public: using BaseT = CompoundBaseOperation; diff --git a/core/src/db/snapshot/Context.h b/core/src/db/snapshot/Context.h index e044e3a90f..58c94f7a35 100644 --- a/core/src/db/snapshot/Context.h +++ b/core/src/db/snapshot/Context.h @@ -54,8 +54,10 @@ struct OperationContext { ScopedSnapshotT prev_ss; SegmentPtr new_segment = nullptr; SegmentCommitPtr new_segment_commit = nullptr; + std::vector new_segment_commits; PartitionPtr new_partition = nullptr; PartitionCommitPtr new_partition_commit = nullptr; + std::vector new_partition_commits; SchemaCommitPtr new_schema_commit = nullptr; CollectionCommitPtr new_collection_commit = nullptr; CollectionPtr new_collection = nullptr; @@ -65,6 +67,7 @@ struct OperationContext { FieldPtr prev_field = nullptr; FieldElementPtr prev_field_element = nullptr; + FieldElementPtr stale_field_element = nullptr; SegmentPtr prev_segment = nullptr; SegmentCommitPtr prev_segment_commit = nullptr; diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index 18d683f4ff..bdc3d90b6a 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -26,14 +26,22 @@ CollectionCommitOperation::DoExecute(Store& store) { } resource_ = std::make_shared(*prev_resource); resource_->ResetStatus(); + + auto handle_new_pc = [&](PartitionCommitPtr& pc) { + auto prev_partition_commit = GetStartedSS()->GetPartitionCommitByPartitionId(pc->GetPartitionId()); + if (prev_partition_commit) + resource_->GetMappings().erase(prev_partition_commit->GetID()); + resource_->GetMappings().insert(pc->GetID()); + }; + if (context_.stale_partition_commit) { resource_->GetMappings().erase(context_.stale_partition_commit->GetID()); } else if (context_.new_partition_commit) { - auto prev_partition_commit = - GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_partition_commit->GetPartitionId()); - if (prev_partition_commit) - resource_->GetMappings().erase(prev_partition_commit->GetID()); - resource_->GetMappings().insert(context_.new_partition_commit->GetID()); + handle_new_pc(context_.new_partition_commit); + } else if (context_.new_partition_commits.size() > 0) { + for (auto& pc : context_.new_partition_commits) { + handle_new_pc(pc); + } } else if (context_.new_schema_commit) { resource_->SetSchemaId(context_.new_schema_commit->GetID()); } @@ -72,10 +80,12 @@ PartitionCommitOperation::PreCheck() { PartitionCommitPtr PartitionCommitOperation::GetPrevResource() const { - auto& segment_commit = context_.new_segment_commit; - if (!segment_commit) - return nullptr; - return GetStartedSS()->GetPartitionCommitByPartitionId(segment_commit->GetPartitionId()); + if (context_.new_segment_commit) { + return GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_segment_commit->GetPartitionId()); + } else if (context_.new_segment_commits.size() > 0) { + return GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_segment_commits[0]->GetPartitionId()); + } + return nullptr; } Status @@ -85,10 +95,20 @@ PartitionCommitOperation::DoExecute(Store& store) { resource_ = std::make_shared(*prev_resource); resource_->SetID(0); resource_->ResetStatus(); - auto prev_segment_commit = - GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_commit->GetSegmentId()); - if (prev_segment_commit) - resource_->GetMappings().erase(prev_segment_commit->GetID()); + auto erase_sc = [&](SegmentCommitPtr& sc) { + if (!sc) + return; + auto prev_sc = GetStartedSS()->GetSegmentCommitBySegmentId(sc->GetSegmentId()); + if (prev_sc) { + resource_->GetMappings().erase(prev_sc->GetID()); + } + }; + + erase_sc(context_.new_segment_commit); + for (auto& sc : context_.new_segment_commits) { + erase_sc(sc); + } + if (context_.stale_segments.size() > 0) { for (auto& stale_segment : context_.stale_segments) { if (stale_segment->GetPartitionId() != prev_resource->GetPartitionId()) { @@ -113,6 +133,10 @@ PartitionCommitOperation::DoExecute(Store& store) { if (context_.new_segment_commit) { resource_->GetMappings().insert(context_.new_segment_commit->GetID()); + } else if (context_.new_segment_commits.size() > 0) { + for (auto& sc : context_.new_segment_commits) { + resource_->GetMappings().insert(sc->GetID()); + } } AddStep(*resource_, false); return Status::OK(); diff --git a/core/src/db/snapshot/Snapshot.h b/core/src/db/snapshot/Snapshot.h index 534547ab91..4f39c9302c 100644 --- a/core/src/db/snapshot/Snapshot.h +++ b/core/src/db/snapshot/Snapshot.h @@ -208,7 +208,7 @@ class Snapshot : public ReferenceProxy { GetFieldElementId(const std::string& field_name, const std::string& field_element_name) const { auto itf = field_element_names_map_.find(field_name); if (itf == field_element_names_map_.end()) - return false; + return 0; auto itfe = itf->second.find(field_element_name); if (itfe == itf->second.end()) { return 0; diff --git a/core/unittest/ssdb/test_snapshot.cpp b/core/unittest/ssdb/test_snapshot.cpp index e6612d6ec4..1231005fc8 100644 --- a/core/unittest/ssdb/test_snapshot.cpp +++ b/core/unittest/ssdb/test_snapshot.cpp @@ -39,6 +39,7 @@ using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext; using OperationContext = milvus::engine::snapshot::OperationContext; using PartitionContext = milvus::engine::snapshot::PartitionContext; using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation; +using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation; using BuildOperation = milvus::engine::snapshot::BuildOperation; using MergeOperation = milvus::engine::snapshot::MergeOperation; using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation; @@ -622,7 +623,7 @@ TEST_F(SnapshotTest, PartitionTest) { /* } */ TEST_F(SnapshotTest, IndexTest) { - LSN_TYPE lsn; + LSN_TYPE lsn = 0; auto next_lsn = [&]() -> decltype(lsn) { return ++lsn; }; @@ -659,6 +660,11 @@ TEST_F(SnapshotTest, IndexTest) { return false; }; + + auto filter2 = [&](SegmentFile::Ptr segment_file) -> bool { + return true; + }; + auto sf_collector = std::make_shared(ss, filter); sf_collector->Iterate(); @@ -688,6 +694,79 @@ TEST_F(SnapshotTest, IndexTest) { it_found = sf_collector->segment_files_.find(seg_file->GetID()); ASSERT_EQ(it_found, sf_collector->segment_files_.end()); + + PartitionContext pp_ctx; + std::stringstream p_name_stream; + + auto num = RandomInt(3, 5); + for (auto i = 0; i < num; ++i) { + p_name_stream.str(""); + p_name_stream << "partition_" << i; + pp_ctx.name = p_name_stream.str(); + ss = CreatePartition(ss->GetName(), pp_ctx, next_lsn()); + ASSERT_TRUE(ss); + } + ASSERT_EQ(ss->NumberOfPartitions(), num + 1); + + sf_collector = std::make_shared(ss, filter2); + sf_collector->Iterate(); + auto prev_total = sf_collector->segment_files_.size(); + + auto create_segment = [&](ID_TYPE partition_id) { + OperationContext context; + context.lsn = next_lsn(); + context.prev_partition = ss->GetResource(partition_id); + auto op = std::make_shared(context, ss); + SegmentPtr new_seg; + status = op->CommitNewSegment(new_seg); + ASSERT_TRUE(status.ok()); + ASSERT_FALSE(new_seg->ToString().empty()); + SegmentFilePtr seg_file; + auto nsf_context = sf_context; + nsf_context.segment_id = new_seg->GetID(); + nsf_context.partition_id = new_seg->GetPartitionId(); + status = op->CommitNewSegmentFile(nsf_context, seg_file); + ASSERT_TRUE(status.ok()); + status = op->Push(); + ASSERT_TRUE(status.ok()); + + status = op->GetSnapshot(ss); + ASSERT_TRUE(status.ok()); + }; + + auto new_total = 0; + auto partitions = ss->GetResources(); + for (auto& kv : partitions) { + num = RandomInt(2, 5); + for (auto i = 0; i < num; ++i) { + create_segment(kv.first); + } + new_total += num; + } + + sf_collector = std::make_shared(ss, filter2); + sf_collector->Iterate(); + auto total = sf_collector->segment_files_.size(); + ASSERT_EQ(total, prev_total + new_total); + + auto field_element_id = ss->GetFieldElementId(sf_context.field_name, + sf_context.field_element_name); + ASSERT_NE(field_element_id, 0); + + OperationContext d_a_i_ctx; + d_a_i_ctx.lsn = next_lsn(); + d_a_i_ctx.stale_field_element = ss->GetResource(field_element_id); + auto drop_all_index_op = std::make_shared(d_a_i_ctx, ss); + status = drop_all_index_op->Push(); + std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + status = drop_all_index_op->GetSnapshot(ss); + ASSERT_TRUE(status.ok()); + + sf_collector = std::make_shared(ss, filter2); + sf_collector->Iterate(); + ASSERT_EQ(sf_collector->segment_files_.size(), total - new_total); } TEST_F(SnapshotTest, OperationTest) {