mirror of https://github.com/milvus-io/milvus.git
parent
523ed18c6e
commit
ecc52d365c
|
@ -82,10 +82,11 @@ Operations::operator()(Store& store) {
|
|||
|
||||
void
|
||||
Operations::SetStatus(const Status& status) {
|
||||
std::unique_lock<std::mutex> lock(finish_mtx_);
|
||||
status_ = status;
|
||||
}
|
||||
|
||||
Status
|
||||
const Status&
|
||||
Operations::WaitToFinish() {
|
||||
std::unique_lock<std::mutex> lock(finish_mtx_);
|
||||
finish_cond_.wait(lock, [this] { return done_; });
|
||||
|
@ -182,7 +183,7 @@ Operations::GetSnapshot(ScopedSnapshotT& ss) const {
|
|||
return status;
|
||||
}
|
||||
|
||||
Status
|
||||
const Status&
|
||||
Operations::ApplyToStore(Store& store) {
|
||||
if (GetType() == OperationsType::W_Compound) {
|
||||
/* std::cout << ToString() << std::endl; */
|
||||
|
|
|
@ -104,10 +104,10 @@ class Operations : public std::enable_shared_from_this<Operations> {
|
|||
virtual Status
|
||||
PreCheck();
|
||||
|
||||
virtual Status
|
||||
virtual const Status&
|
||||
ApplyToStore(Store& store);
|
||||
|
||||
Status
|
||||
const Status&
|
||||
WaitToFinish();
|
||||
|
||||
void
|
||||
|
@ -116,8 +116,9 @@ class Operations : public std::enable_shared_from_this<Operations> {
|
|||
void
|
||||
SetStatus(const Status& status);
|
||||
|
||||
Status
|
||||
const Status&
|
||||
GetStatus() const {
|
||||
std::unique_lock<std::mutex> lock(finish_mtx_);
|
||||
return status_;
|
||||
}
|
||||
|
||||
|
@ -241,7 +242,7 @@ class LoadOperation : public Operations {
|
|||
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::O_Leaf), context_(context) {
|
||||
}
|
||||
|
||||
Status
|
||||
const Status&
|
||||
ApplyToStore(Store& store) override {
|
||||
if (done_) {
|
||||
Done(store);
|
||||
|
@ -290,7 +291,7 @@ class HardDeleteOperation : public Operations {
|
|||
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::W_Leaf), id_(id) {
|
||||
}
|
||||
|
||||
Status
|
||||
const Status&
|
||||
ApplyToStore(Store& store) override {
|
||||
if (done_)
|
||||
return status_;
|
||||
|
@ -311,7 +312,7 @@ class HardDeleteOperation<Collection> : public Operations {
|
|||
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::W_Leaf), id_(id) {
|
||||
}
|
||||
|
||||
Status
|
||||
const Status&
|
||||
ApplyToStore(Store& store) override {
|
||||
if (done_) {
|
||||
Done(store);
|
||||
|
|
|
@ -108,7 +108,7 @@ class LoadOperation<Collection> : public Operations {
|
|||
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::O_Leaf), context_(context) {
|
||||
}
|
||||
|
||||
Status
|
||||
const Status&
|
||||
ApplyToStore(Store& store) override {
|
||||
if (done_) {
|
||||
Done(store);
|
||||
|
|
|
@ -839,7 +839,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
|
|||
context.lsn = next_lsn();
|
||||
auto op = std::make_shared<NewSegmentOperation>(context, latest_ss);
|
||||
SegmentPtr new_seg;
|
||||
status = op->CommitNewSegment(new_seg);
|
||||
auto status = op->CommitNewSegment(new_seg);
|
||||
if (!status.ok()) {
|
||||
std::cout << status.ToString() << std::endl;
|
||||
}
|
||||
|
@ -1121,8 +1121,12 @@ TEST_F(SnapshotTest, CompoundTest2) {
|
|||
for (auto& id : seg_ids) {
|
||||
auto seg = latest_ss->GetResource<Segment>(id);
|
||||
if (!seg) {
|
||||
std::cout << "Error seg=" << id << std::endl;
|
||||
ASSERT_TRUE(seg);
|
||||
std::cout << "Stale seg=" << id << std::endl;
|
||||
std::unique_lock<std::mutex> lock(partition_mtx);
|
||||
std::cout << ((stale_partitions.find(p_id) != stale_partitions.end()) ? " due stale partition"
|
||||
: " unexpected") << std::endl;
|
||||
ASSERT_TRUE(stale_partitions.find(p_id) != stale_partitions.end());
|
||||
return;
|
||||
}
|
||||
if (!partition) {
|
||||
partition = latest_ss->GetResource<Partition>(seg->GetPartitionId());
|
||||
|
@ -1212,7 +1216,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
|
|||
context.lsn = next_lsn();
|
||||
auto op = std::make_shared<NewSegmentOperation>(context, latest_ss);
|
||||
SegmentPtr new_seg;
|
||||
status = op->CommitNewSegment(new_seg);
|
||||
auto status = op->CommitNewSegment(new_seg);
|
||||
if (!status.ok()) {
|
||||
std::cout << status.ToString() << std::endl;
|
||||
std::unique_lock<std::mutex> lock(partition_mtx);
|
||||
|
@ -1229,8 +1233,13 @@ TEST_F(SnapshotTest, CompoundTest2) {
|
|||
}
|
||||
status = op->Push();
|
||||
if (!status.ok()) {
|
||||
std::cout << status.ToString() << std::endl;
|
||||
std::unique_lock<std::mutex> lock(partition_mtx);
|
||||
/* if (stale_partitions.find(partition_id) == stale_partitions.end()) { */
|
||||
/* for (auto p : stale_partitions) { */
|
||||
/* std::cout << "stale p: " << p << std::endl; */
|
||||
/* } */
|
||||
/* ASSERT_TRUE(false); */
|
||||
/* } */
|
||||
ASSERT_TRUE(stale_partitions.find(partition_id) != stale_partitions.end());
|
||||
return;
|
||||
}
|
||||
|
@ -1289,10 +1298,10 @@ TEST_F(SnapshotTest, CompoundTest2) {
|
|||
};
|
||||
|
||||
for (auto i = 0; i < loop_cnt; ++i) {
|
||||
if (RandomInt(0, 10) > 6) {
|
||||
if (RandomInt(0, 10) > 5) {
|
||||
create_partition();
|
||||
}
|
||||
if (RandomInt(0, 10) > 8) {
|
||||
if (RandomInt(0, 10) > 7) {
|
||||
drop_partition();
|
||||
}
|
||||
create_new_segment();
|
||||
|
|
Loading…
Reference in New Issue