diff --git a/core/src/db/snapshot/Operations.cpp b/core/src/db/snapshot/Operations.cpp index a9b9466833..e64b01cadd 100644 --- a/core/src/db/snapshot/Operations.cpp +++ b/core/src/db/snapshot/Operations.cpp @@ -210,7 +210,10 @@ Operations::ApplyToStore(StorePtr store) { Status Operations::OnSnapshotDropped() { - return Status::OK(); + std::stringstream msg; + msg << "Collection " << GetStartedSS()->GetCollection()->GetID() << " was dropped before" << std::endl; + LOG_ENGINE_WARNING_ << msg.str(); + return Status(SS_COLLECTION_DROPPED, msg.str()); } Status @@ -234,6 +237,8 @@ Operations::PreExecute(StorePtr store) { STATUS_CHECK(Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId())); if (!context_.prev_ss) { STATUS_CHECK(OnSnapshotDropped()); + } else if (!prev_ss_->GetCollection()->IsActive()) { + STATUS_CHECK(OnSnapshotDropped()); } else if (prev_ss_->GetID() != context_.prev_ss->GetID()) { STATUS_CHECK(OnSnapshotStale()); } diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index af32fd78fa..9d6ee79ebe 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -208,6 +208,11 @@ SegmentOperation::PreCheck() { emsg << GetRepr() << ". prev_partition should be specified in context"; return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } + if (!GetStartedSS()->GetResource(context_.prev_partition->GetID())) { + std::stringstream emsg; + emsg << GetRepr() << ". Partition is stale"; + return Status(SS_STALE_ERROR, emsg.str()); + } return Status::OK(); } diff --git a/core/src/utils/Error.h b/core/src/utils/Error.h index a82b4570db..dc503acc37 100644 --- a/core/src/utils/Error.h +++ b/core/src/utils/Error.h @@ -137,5 +137,6 @@ constexpr ErrorCode SS_INVALID_ARGUMENT_ERROR = ToSSErrorCode(8); constexpr ErrorCode SS_OPERATION_PENDING = ToSSErrorCode(9); constexpr ErrorCode SS_TIMEOUT = ToSSErrorCode(10); constexpr ErrorCode SS_NOT_COMMITED = ToSSErrorCode(11); +constexpr ErrorCode SS_COLLECTION_DROPPED = ToSSErrorCode(12); } // namespace milvus diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 135a8b8614..877a76d3fb 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -795,13 +795,15 @@ TEST_F(DBTest, GetEntityTest) { milvus::engine::IDNumbers entity_ids; milvus::engine::DataChunkPtr dataChunkPtr; - insert_entities(collection_name, "", 10000, 0, entity_ids, dataChunkPtr); + insert_entities(collection_name, "", 2000, 0, entity_ids, dataChunkPtr); + std::cout << "Post InsertEntities" << std::endl; ASSERT_TRUE(status.ok()) << status.ToString(); milvus::engine::snapshot::CollectionPtr collection; milvus::engine::snapshot::FieldElementMappings field_mappings; status = db_->GetCollectionInfo(collection_name, collection, field_mappings); ASSERT_TRUE(status.ok()) << status.ToString(); + std::cout << "Post GetCollectionInfo" << std::endl; { std::vector field_names; @@ -818,6 +820,7 @@ TEST_F(DBTest, GetEntityTest) { ASSERT_TRUE(get_data_chunk->fixed_fields_[name]->data_ == dataChunkPtr->fixed_fields_[name]->data_); } } + std::cout << "Post GetEntityByID1" << std::endl; { std::vector field_names; @@ -830,6 +833,7 @@ TEST_F(DBTest, GetEntityTest) { status = db_->GetEntityByID(collection_name, entity_ids, field_names, valid_row, get_data_chunk); ASSERT_TRUE(!status.ok()); } + std::cout << "Post GetEntityByID2" << std::endl; { std::vector field_names; @@ -850,6 +854,7 @@ TEST_F(DBTest, GetEntityTest) { ASSERT_TRUE(get_data_chunk->fixed_fields_[name]->data_ == dataChunkPtr->fixed_fields_[name]->data_); } } + std::cout << "Post GetEntityByID3" << std::endl; } TEST_F(DBTest, CompactTest) { diff --git a/core/unittest/db/test_snapshot.cpp b/core/unittest/db/test_snapshot.cpp index 912a1be596..3b9a1c0b30 100644 --- a/core/unittest/db/test_snapshot.cpp +++ b/core/unittest/db/test_snapshot.cpp @@ -1226,6 +1226,96 @@ TEST_F(SnapshotTest, OperationTest2) { } } +TEST_F(SnapshotTest, DropTest) { + Status status; + std::atomic lsn = 0; + auto next_lsn = [&]() -> decltype(lsn) { + return ++lsn; + }; + + std::string collection_name("c1"); + auto ss = CreateCollection(collection_name, next_lsn()); + ASSERT_TRUE(ss); + + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + + { + OperationContext context; + context.lsn = next_lsn(); + context.prev_partition = ss->GetResources().begin()->second.Get(); + auto op = std::make_shared(context, ss); + SegmentPtr new_seg; + status = op->CommitNewSegment(new_seg); + ASSERT_TRUE(status.ok()); + ASSERT_FALSE(new_seg->ToString().empty()); + SegmentFilePtr seg_file; + status = op->CommitNewSegmentFile(sf_context, seg_file); + ASSERT_TRUE(status.ok()); + + status = Snapshots::GetInstance().DropCollection(ss->GetName(), next_lsn()); + ASSERT_TRUE(status.ok()); + status = op->Push(); + ASSERT_FALSE(status.ok()) << status.message(); + } + + ss = CreateCollection(collection_name, next_lsn()); + ASSERT_TRUE(ss); + + { + PartitionContext pp_ctx; + pp_ctx.name = "p1"; + ss = CreatePartition(ss->GetName(), pp_ctx, next_lsn()); + ASSERT_TRUE(ss); + + OperationContext context; + context.lsn = next_lsn(); + context.prev_partition = ss->GetPartition(pp_ctx.name); + ASSERT_TRUE(context.prev_partition); + auto op = std::make_shared(context, ss); + SegmentPtr new_seg; + status = op->CommitNewSegment(new_seg); + ASSERT_TRUE(status.ok()); + ASSERT_FALSE(new_seg->ToString().empty()); + SegmentFilePtr seg_file; + sf_context.segment_id = new_seg->GetID(); + sf_context.partition_id = new_seg->GetPartitionId(); + sf_context.collection_id = new_seg->GetCollectionId(); + status = op->CommitNewSegmentFile(sf_context, seg_file); + ASSERT_TRUE(status.ok()); + + status = Snapshots::GetInstance().DropPartition( + context.prev_partition->GetCollectionId(), context.prev_partition->GetID(), next_lsn()); + ASSERT_TRUE(status.ok()); + + status = op->Push(); + ASSERT_FALSE(status.ok()); + + { + context.lsn = next_lsn(); + auto c_op = std::make_shared(context, ss); + OperationContext new_seg_ctx; + new_seg_ctx.prev_partition = context.prev_partition; + status = c_op->CommitNewSegment(new_seg_ctx, new_seg); + ASSERT_TRUE(status.ok()); + status = c_op->Push(); + ASSERT_FALSE(status.ok()) << status.ToString(); + } + + { + status = Snapshots::GetInstance().GetSnapshot(ss, collection_name); + ASSERT_TRUE(status.ok()); + context.lsn = next_lsn(); + auto c_op = std::make_shared(context, ss); + OperationContext new_seg_ctx; + new_seg_ctx.prev_partition = context.prev_partition; + status = c_op->CommitNewSegment(new_seg_ctx, new_seg); + ASSERT_FALSE(status.ok()) << status.ToString(); + } + + } +} + TEST_F(SnapshotTest, CompoundTest1) { Status status; std::atomic lsn = 0;