mirror of https://github.com/milvus-io/milvus.git
Apply more stale check rules for Operations (#3721)
* (db/snapshot): Update Operations to handle stale errors Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): add some print in ut Signed-off-by: peng.xu <peng.xu@zilliz.com> Signed-off-by: shengjun.li <shengjun.li@zilliz.com>pull/3805/head
parent
82711d6065
commit
52b0181e89
|
@ -210,7 +210,10 @@ Operations::ApplyToStore(StorePtr store) {
|
|||
|
||||
Status
|
||||
Operations::OnSnapshotDropped() {
|
||||
return Status::OK();
|
||||
std::stringstream msg;
|
||||
msg << "Collection " << GetStartedSS()->GetCollection()->GetID() << " was dropped before" << std::endl;
|
||||
LOG_ENGINE_WARNING_ << msg.str();
|
||||
return Status(SS_COLLECTION_DROPPED, msg.str());
|
||||
}
|
||||
|
||||
Status
|
||||
|
@ -234,6 +237,8 @@ Operations::PreExecute(StorePtr store) {
|
|||
STATUS_CHECK(Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId()));
|
||||
if (!context_.prev_ss) {
|
||||
STATUS_CHECK(OnSnapshotDropped());
|
||||
} else if (!prev_ss_->GetCollection()->IsActive()) {
|
||||
STATUS_CHECK(OnSnapshotDropped());
|
||||
} else if (prev_ss_->GetID() != context_.prev_ss->GetID()) {
|
||||
STATUS_CHECK(OnSnapshotStale());
|
||||
}
|
||||
|
|
|
@ -208,6 +208,11 @@ SegmentOperation::PreCheck() {
|
|||
emsg << GetRepr() << ". prev_partition should be specified in context";
|
||||
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
|
||||
}
|
||||
if (!GetStartedSS()->GetResource<Partition>(context_.prev_partition->GetID())) {
|
||||
std::stringstream emsg;
|
||||
emsg << GetRepr() << ". Partition is stale";
|
||||
return Status(SS_STALE_ERROR, emsg.str());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -137,5 +137,6 @@ constexpr ErrorCode SS_INVALID_ARGUMENT_ERROR = ToSSErrorCode(8);
|
|||
constexpr ErrorCode SS_OPERATION_PENDING = ToSSErrorCode(9);
|
||||
constexpr ErrorCode SS_TIMEOUT = ToSSErrorCode(10);
|
||||
constexpr ErrorCode SS_NOT_COMMITED = ToSSErrorCode(11);
|
||||
constexpr ErrorCode SS_COLLECTION_DROPPED = ToSSErrorCode(12);
|
||||
|
||||
} // namespace milvus
|
||||
|
|
|
@ -795,13 +795,15 @@ TEST_F(DBTest, GetEntityTest) {
|
|||
|
||||
milvus::engine::IDNumbers entity_ids;
|
||||
milvus::engine::DataChunkPtr dataChunkPtr;
|
||||
insert_entities(collection_name, "", 10000, 0, entity_ids, dataChunkPtr);
|
||||
insert_entities(collection_name, "", 2000, 0, entity_ids, dataChunkPtr);
|
||||
std::cout << "Post InsertEntities" << std::endl;
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
milvus::engine::snapshot::CollectionPtr collection;
|
||||
milvus::engine::snapshot::FieldElementMappings field_mappings;
|
||||
status = db_->GetCollectionInfo(collection_name, collection, field_mappings);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
std::cout << "Post GetCollectionInfo" << std::endl;
|
||||
|
||||
{
|
||||
std::vector<std::string> field_names;
|
||||
|
@ -818,6 +820,7 @@ TEST_F(DBTest, GetEntityTest) {
|
|||
ASSERT_TRUE(get_data_chunk->fixed_fields_[name]->data_ == dataChunkPtr->fixed_fields_[name]->data_);
|
||||
}
|
||||
}
|
||||
std::cout << "Post GetEntityByID1" << std::endl;
|
||||
|
||||
{
|
||||
std::vector<std::string> field_names;
|
||||
|
@ -830,6 +833,7 @@ TEST_F(DBTest, GetEntityTest) {
|
|||
status = db_->GetEntityByID(collection_name, entity_ids, field_names, valid_row, get_data_chunk);
|
||||
ASSERT_TRUE(!status.ok());
|
||||
}
|
||||
std::cout << "Post GetEntityByID2" << std::endl;
|
||||
|
||||
{
|
||||
std::vector<std::string> field_names;
|
||||
|
@ -850,6 +854,7 @@ TEST_F(DBTest, GetEntityTest) {
|
|||
ASSERT_TRUE(get_data_chunk->fixed_fields_[name]->data_ == dataChunkPtr->fixed_fields_[name]->data_);
|
||||
}
|
||||
}
|
||||
std::cout << "Post GetEntityByID3" << std::endl;
|
||||
}
|
||||
|
||||
TEST_F(DBTest, CompactTest) {
|
||||
|
|
|
@ -1226,6 +1226,96 @@ TEST_F(SnapshotTest, OperationTest2) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, DropTest) {
|
||||
Status status;
|
||||
std::atomic<LSN_TYPE> lsn = 0;
|
||||
auto next_lsn = [&]() -> decltype(lsn) {
|
||||
return ++lsn;
|
||||
};
|
||||
|
||||
std::string collection_name("c1");
|
||||
auto ss = CreateCollection(collection_name, next_lsn());
|
||||
ASSERT_TRUE(ss);
|
||||
|
||||
SegmentFileContext sf_context;
|
||||
SFContextBuilder(sf_context, ss);
|
||||
|
||||
{
|
||||
OperationContext context;
|
||||
context.lsn = next_lsn();
|
||||
context.prev_partition = ss->GetResources<Partition>().begin()->second.Get();
|
||||
auto op = std::make_shared<NewSegmentOperation>(context, ss);
|
||||
SegmentPtr new_seg;
|
||||
status = op->CommitNewSegment(new_seg);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_FALSE(new_seg->ToString().empty());
|
||||
SegmentFilePtr seg_file;
|
||||
status = op->CommitNewSegmentFile(sf_context, seg_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
status = Snapshots::GetInstance().DropCollection(ss->GetName(), next_lsn());
|
||||
ASSERT_TRUE(status.ok());
|
||||
status = op->Push();
|
||||
ASSERT_FALSE(status.ok()) << status.message();
|
||||
}
|
||||
|
||||
ss = CreateCollection(collection_name, next_lsn());
|
||||
ASSERT_TRUE(ss);
|
||||
|
||||
{
|
||||
PartitionContext pp_ctx;
|
||||
pp_ctx.name = "p1";
|
||||
ss = CreatePartition(ss->GetName(), pp_ctx, next_lsn());
|
||||
ASSERT_TRUE(ss);
|
||||
|
||||
OperationContext context;
|
||||
context.lsn = next_lsn();
|
||||
context.prev_partition = ss->GetPartition(pp_ctx.name);
|
||||
ASSERT_TRUE(context.prev_partition);
|
||||
auto op = std::make_shared<NewSegmentOperation>(context, ss);
|
||||
SegmentPtr new_seg;
|
||||
status = op->CommitNewSegment(new_seg);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_FALSE(new_seg->ToString().empty());
|
||||
SegmentFilePtr seg_file;
|
||||
sf_context.segment_id = new_seg->GetID();
|
||||
sf_context.partition_id = new_seg->GetPartitionId();
|
||||
sf_context.collection_id = new_seg->GetCollectionId();
|
||||
status = op->CommitNewSegmentFile(sf_context, seg_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
status = Snapshots::GetInstance().DropPartition(
|
||||
context.prev_partition->GetCollectionId(), context.prev_partition->GetID(), next_lsn());
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
status = op->Push();
|
||||
ASSERT_FALSE(status.ok());
|
||||
|
||||
{
|
||||
context.lsn = next_lsn();
|
||||
auto c_op = std::make_shared<CompoundSegmentsOperation>(context, ss);
|
||||
OperationContext new_seg_ctx;
|
||||
new_seg_ctx.prev_partition = context.prev_partition;
|
||||
status = c_op->CommitNewSegment(new_seg_ctx, new_seg);
|
||||
ASSERT_TRUE(status.ok());
|
||||
status = c_op->Push();
|
||||
ASSERT_FALSE(status.ok()) << status.ToString();
|
||||
}
|
||||
|
||||
{
|
||||
status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
|
||||
ASSERT_TRUE(status.ok());
|
||||
context.lsn = next_lsn();
|
||||
auto c_op = std::make_shared<CompoundSegmentsOperation>(context, ss);
|
||||
OperationContext new_seg_ctx;
|
||||
new_seg_ctx.prev_partition = context.prev_partition;
|
||||
status = c_op->CommitNewSegment(new_seg_ctx, new_seg);
|
||||
ASSERT_FALSE(status.ok()) << status.ToString();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, CompoundTest1) {
|
||||
Status status;
|
||||
std::atomic<LSN_TYPE> lsn = 0;
|
||||
|
|
Loading…
Reference in New Issue