(db/snapshot): Add DropAllIndexOperation and related UT (#2737)

* (db/snapshot): add drop all index operation

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add ut for drop all index operation

Signed-off-by: peng.xu <peng.xu@zilliz.com>
pull/2742/head
XuPeng-SH 2020-07-04 12:09:21 +08:00 committed by GitHub
parent 0672347775
commit 3b898420d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 202 additions and 18 deletions

View File

@ -10,8 +10,10 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/snapshot/CompoundOperations.h"
#include <map>
#include <memory>
#include <sstream>
#include <vector>
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Status.h"
@ -84,6 +86,68 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
return Status::OK();
}
DropAllIndexOperation::DropAllIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
Status
DropAllIndexOperation::PreCheck() {
if (context_.stale_field_element == nullptr) {
std::stringstream emsg;
emsg << GetRepr() << ". Stale field element is requried";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
if (!GetStartedSS()->GetResource<FieldElement>(context_.stale_field_element->GetID())) {
std::stringstream emsg;
emsg << GetRepr() << ". Specified field element " << context_.stale_field_element->GetName();
emsg << " is stale";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
// TODO: Check type
return Status::OK();
}
Status
DropAllIndexOperation::DoExecute(Store& store) {
auto& segment_files = GetAdjustedSS()->GetResources<SegmentFile>();
std::map<ID_TYPE, std::vector<SegmentCommitPtr>> p_sc_map;
for (auto& kv : segment_files) {
if (kv.second->GetFieldElementId() != context_.stale_field_element->GetID()) {
continue;
}
auto context = context_;
context.stale_segment_file = kv.second.Get();
SegmentCommitOperation sc_op(context, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(context.new_segment_commit));
AddStepWithLsn(*context.new_segment_commit, context.lsn);
p_sc_map[context.new_segment_commit->GetPartitionId()].push_back(context.new_segment_commit);
}
OperationContext cc_context;
for (auto& kv : p_sc_map) {
auto& partition_id = kv.first;
auto context = context_;
context.new_segment_commits = kv.second;
PartitionCommitOperation pc_op(context, GetAdjustedSS());
STATUS_CHECK(pc_op(store));
STATUS_CHECK(pc_op.GetResource(context.new_partition_commit));
AddStepWithLsn(*context.new_partition_commit, context.lsn);
cc_context.new_partition_commits.push_back(context.new_partition_commit);
}
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
STATUS_CHECK(cc_op(store));
STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
return Status::OK();
}
DropIndexOperation::DropIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}

View File

@ -74,10 +74,10 @@ class BuildOperation : public CompoundBaseOperation<BuildOperation> {
CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const;
};
class DropIndexOperation : public CompoundBaseOperation<BuildOperation> {
class DropIndexOperation : public CompoundBaseOperation<DropIndexOperation> {
public:
using BaseT = CompoundBaseOperation<BuildOperation>;
static constexpr const char* Name = "B";
using BaseT = CompoundBaseOperation<DropIndexOperation>;
static constexpr const char* Name = "DI";
DropIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
@ -88,6 +88,20 @@ class DropIndexOperation : public CompoundBaseOperation<BuildOperation> {
DoExecute(Store&) override;
};
class DropAllIndexOperation : public CompoundBaseOperation<DropAllIndexOperation> {
public:
using BaseT = CompoundBaseOperation<DropAllIndexOperation>;
static constexpr const char* Name = "DAI";
DropAllIndexOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
Status
PreCheck() override;
Status
DoExecute(Store&) override;
};
class NewSegmentOperation : public CompoundBaseOperation<NewSegmentOperation> {
public:
using BaseT = CompoundBaseOperation<NewSegmentOperation>;

View File

@ -54,8 +54,10 @@ struct OperationContext {
ScopedSnapshotT prev_ss;
SegmentPtr new_segment = nullptr;
SegmentCommitPtr new_segment_commit = nullptr;
std::vector<SegmentCommitPtr> new_segment_commits;
PartitionPtr new_partition = nullptr;
PartitionCommitPtr new_partition_commit = nullptr;
std::vector<PartitionCommitPtr> new_partition_commits;
SchemaCommitPtr new_schema_commit = nullptr;
CollectionCommitPtr new_collection_commit = nullptr;
CollectionPtr new_collection = nullptr;
@ -65,6 +67,7 @@ struct OperationContext {
FieldPtr prev_field = nullptr;
FieldElementPtr prev_field_element = nullptr;
FieldElementPtr stale_field_element = nullptr;
SegmentPtr prev_segment = nullptr;
SegmentCommitPtr prev_segment_commit = nullptr;

View File

@ -26,14 +26,22 @@ CollectionCommitOperation::DoExecute(Store& store) {
}
resource_ = std::make_shared<CollectionCommit>(*prev_resource);
resource_->ResetStatus();
auto handle_new_pc = [&](PartitionCommitPtr& pc) {
auto prev_partition_commit = GetStartedSS()->GetPartitionCommitByPartitionId(pc->GetPartitionId());
if (prev_partition_commit)
resource_->GetMappings().erase(prev_partition_commit->GetID());
resource_->GetMappings().insert(pc->GetID());
};
if (context_.stale_partition_commit) {
resource_->GetMappings().erase(context_.stale_partition_commit->GetID());
} else if (context_.new_partition_commit) {
auto prev_partition_commit =
GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_partition_commit->GetPartitionId());
if (prev_partition_commit)
resource_->GetMappings().erase(prev_partition_commit->GetID());
resource_->GetMappings().insert(context_.new_partition_commit->GetID());
handle_new_pc(context_.new_partition_commit);
} else if (context_.new_partition_commits.size() > 0) {
for (auto& pc : context_.new_partition_commits) {
handle_new_pc(pc);
}
} else if (context_.new_schema_commit) {
resource_->SetSchemaId(context_.new_schema_commit->GetID());
}
@ -72,10 +80,12 @@ PartitionCommitOperation::PreCheck() {
PartitionCommitPtr
PartitionCommitOperation::GetPrevResource() const {
auto& segment_commit = context_.new_segment_commit;
if (!segment_commit)
return nullptr;
return GetStartedSS()->GetPartitionCommitByPartitionId(segment_commit->GetPartitionId());
if (context_.new_segment_commit) {
return GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_segment_commit->GetPartitionId());
} else if (context_.new_segment_commits.size() > 0) {
return GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_segment_commits[0]->GetPartitionId());
}
return nullptr;
}
Status
@ -85,10 +95,20 @@ PartitionCommitOperation::DoExecute(Store& store) {
resource_ = std::make_shared<PartitionCommit>(*prev_resource);
resource_->SetID(0);
resource_->ResetStatus();
auto prev_segment_commit =
GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_commit->GetSegmentId());
if (prev_segment_commit)
resource_->GetMappings().erase(prev_segment_commit->GetID());
auto erase_sc = [&](SegmentCommitPtr& sc) {
if (!sc)
return;
auto prev_sc = GetStartedSS()->GetSegmentCommitBySegmentId(sc->GetSegmentId());
if (prev_sc) {
resource_->GetMappings().erase(prev_sc->GetID());
}
};
erase_sc(context_.new_segment_commit);
for (auto& sc : context_.new_segment_commits) {
erase_sc(sc);
}
if (context_.stale_segments.size() > 0) {
for (auto& stale_segment : context_.stale_segments) {
if (stale_segment->GetPartitionId() != prev_resource->GetPartitionId()) {
@ -113,6 +133,10 @@ PartitionCommitOperation::DoExecute(Store& store) {
if (context_.new_segment_commit) {
resource_->GetMappings().insert(context_.new_segment_commit->GetID());
} else if (context_.new_segment_commits.size() > 0) {
for (auto& sc : context_.new_segment_commits) {
resource_->GetMappings().insert(sc->GetID());
}
}
AddStep(*resource_, false);
return Status::OK();

View File

@ -208,7 +208,7 @@ class Snapshot : public ReferenceProxy {
GetFieldElementId(const std::string& field_name, const std::string& field_element_name) const {
auto itf = field_element_names_map_.find(field_name);
if (itf == field_element_names_map_.end())
return false;
return 0;
auto itfe = itf->second.find(field_element_name);
if (itfe == itf->second.end()) {
return 0;

View File

@ -39,6 +39,7 @@ using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext = milvus::engine::snapshot::OperationContext;
using PartitionContext = milvus::engine::snapshot::PartitionContext;
using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation;
using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation;
using BuildOperation = milvus::engine::snapshot::BuildOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
@ -622,7 +623,7 @@ TEST_F(SnapshotTest, PartitionTest) {
/* } */
TEST_F(SnapshotTest, IndexTest) {
LSN_TYPE lsn;
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
@ -659,6 +660,11 @@ TEST_F(SnapshotTest, IndexTest) {
return false;
};
auto filter2 = [&](SegmentFile::Ptr segment_file) -> bool {
return true;
};
auto sf_collector = std::make_shared<SegmentFileCollector>(ss, filter);
sf_collector->Iterate();
@ -688,6 +694,79 @@ TEST_F(SnapshotTest, IndexTest) {
it_found = sf_collector->segment_files_.find(seg_file->GetID());
ASSERT_EQ(it_found, sf_collector->segment_files_.end());
PartitionContext pp_ctx;
std::stringstream p_name_stream;
auto num = RandomInt(3, 5);
for (auto i = 0; i < num; ++i) {
p_name_stream.str("");
p_name_stream << "partition_" << i;
pp_ctx.name = p_name_stream.str();
ss = CreatePartition(ss->GetName(), pp_ctx, next_lsn());
ASSERT_TRUE(ss);
}
ASSERT_EQ(ss->NumberOfPartitions(), num + 1);
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter2);
sf_collector->Iterate();
auto prev_total = sf_collector->segment_files_.size();
auto create_segment = [&](ID_TYPE partition_id) {
OperationContext context;
context.lsn = next_lsn();
context.prev_partition = ss->GetResource<Partition>(partition_id);
auto op = std::make_shared<NewSegmentOperation>(context, ss);
SegmentPtr new_seg;
status = op->CommitNewSegment(new_seg);
ASSERT_TRUE(status.ok());
ASSERT_FALSE(new_seg->ToString().empty());
SegmentFilePtr seg_file;
auto nsf_context = sf_context;
nsf_context.segment_id = new_seg->GetID();
nsf_context.partition_id = new_seg->GetPartitionId();
status = op->CommitNewSegmentFile(nsf_context, seg_file);
ASSERT_TRUE(status.ok());
status = op->Push();
ASSERT_TRUE(status.ok());
status = op->GetSnapshot(ss);
ASSERT_TRUE(status.ok());
};
auto new_total = 0;
auto partitions = ss->GetResources<Partition>();
for (auto& kv : partitions) {
num = RandomInt(2, 5);
for (auto i = 0; i < num; ++i) {
create_segment(kv.first);
}
new_total += num;
}
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter2);
sf_collector->Iterate();
auto total = sf_collector->segment_files_.size();
ASSERT_EQ(total, prev_total + new_total);
auto field_element_id = ss->GetFieldElementId(sf_context.field_name,
sf_context.field_element_name);
ASSERT_NE(field_element_id, 0);
OperationContext d_a_i_ctx;
d_a_i_ctx.lsn = next_lsn();
d_a_i_ctx.stale_field_element = ss->GetResource<FieldElement>(field_element_id);
auto drop_all_index_op = std::make_shared<DropAllIndexOperation>(d_a_i_ctx, ss);
status = drop_all_index_op->Push();
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
status = drop_all_index_op->GetSnapshot(ss);
ASSERT_TRUE(status.ok());
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter2);
sf_collector->Iterate();
ASSERT_EQ(sf_collector->segment_files_.size(), total - new_total);
}
TEST_F(SnapshotTest, OperationTest) {