mirror of https://github.com/milvus-io/milvus.git
fix delete entity test case (#3162)
* fix delete eneity bug Signed-off-by: yhmo <yihua.mo@zilliz.com> * fix delete entity test case Signed-off-by: yhmo <yihua.mo@zilliz.com> Co-authored-by: shengjun.li <shengjun.li@zilliz.com>pull/3158/head^2
parent
1042f2e1f1
commit
ce1e528d1c
|
@ -25,7 +25,7 @@ IDGenerator::~IDGenerator() = default;
|
|||
|
||||
constexpr size_t SimpleIDGenerator::MAX_IDS_PER_MICRO;
|
||||
|
||||
IDNumber
|
||||
id_t
|
||||
SimpleIDGenerator::GetNextIDNumber() {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
|
||||
|
@ -61,7 +61,7 @@ SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
IDNumber
|
||||
id_t
|
||||
SafeIDGenerator::GetNextIDNumber() {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
|
||||
|
|
|
@ -23,7 +23,7 @@ namespace engine {
|
|||
|
||||
class IDGenerator {
|
||||
public:
|
||||
virtual IDNumber
|
||||
virtual id_t
|
||||
GetNextIDNumber() = 0;
|
||||
|
||||
virtual Status
|
||||
|
@ -36,7 +36,7 @@ class SimpleIDGenerator : public IDGenerator {
|
|||
public:
|
||||
~SimpleIDGenerator() override = default;
|
||||
|
||||
IDNumber
|
||||
id_t
|
||||
GetNextIDNumber() override;
|
||||
|
||||
Status
|
||||
|
@ -59,7 +59,7 @@ class SafeIDGenerator : public IDGenerator {
|
|||
|
||||
~SafeIDGenerator() override = default;
|
||||
|
||||
IDNumber
|
||||
id_t
|
||||
GetNextIDNumber() override;
|
||||
|
||||
Status
|
||||
|
|
|
@ -31,12 +31,9 @@ namespace engine {
|
|||
|
||||
using id_t = int64_t;
|
||||
using offset_t = int32_t;
|
||||
using date_t = int32_t;
|
||||
|
||||
using DateT = int;
|
||||
|
||||
using IDNumber = int64_t;
|
||||
using IDNumberPtr = IDNumber*;
|
||||
using IDNumbers = std::vector<IDNumber>;
|
||||
using IDNumbers = std::vector<id_t>;
|
||||
|
||||
using VectorDistance = faiss::Index::distance_t;
|
||||
using VectorDistances = std::vector<VectorDistance>;
|
||||
|
|
|
@ -60,7 +60,7 @@ IsBinaryMetricType(const std::string& metric_type) {
|
|||
(metric_type == knowhere::Metric::TANIMOTO);
|
||||
}
|
||||
|
||||
engine::DateT
|
||||
engine::date_t
|
||||
GetDate(const std::time_t& t, int day_delta) {
|
||||
struct tm ltm;
|
||||
localtime_r(&t, <m);
|
||||
|
@ -80,12 +80,12 @@ GetDate(const std::time_t& t, int day_delta) {
|
|||
return ltm.tm_year * 10000 + ltm.tm_mon * 100 + ltm.tm_mday;
|
||||
}
|
||||
|
||||
engine::DateT
|
||||
engine::date_t
|
||||
GetDateWithDelta(int day_delta) {
|
||||
return GetDate(std::time(nullptr), day_delta);
|
||||
}
|
||||
|
||||
engine::DateT
|
||||
engine::date_t
|
||||
GetDate() {
|
||||
return GetDate(std::time(nullptr), 0);
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids) {
|
|||
}
|
||||
|
||||
if (!pair->second->data_.empty()) {
|
||||
ids.resize(pair->second->data_.size() / sizeof(engine::IDNumber));
|
||||
ids.resize(pair->second->data_.size() / sizeof(engine::id_t));
|
||||
memcpy((void*)(ids.data()), pair->second->data_.data(), pair->second->data_.size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,11 +31,11 @@ IsSameIndex(const CollectionIndex& index1, const CollectionIndex& index2);
|
|||
bool
|
||||
IsBinaryMetricType(const std::string& metric_type);
|
||||
|
||||
engine::DateT
|
||||
engine::date_t
|
||||
GetDate(const std::time_t& t, int day_delta = 0);
|
||||
engine::DateT
|
||||
engine::date_t
|
||||
GetDate();
|
||||
engine::DateT
|
||||
engine::date_t
|
||||
GetDateWithDelta(int day_delta);
|
||||
|
||||
struct MetaUriInfo {
|
||||
|
|
|
@ -171,7 +171,9 @@ MemCollection::ApplyDeletes() {
|
|||
context.lsn = lsn_;
|
||||
auto segments_op = std::make_shared<snapshot::CompoundSegmentsOperation>(context, ss);
|
||||
|
||||
int64_t segment_iterated = 0;
|
||||
auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status {
|
||||
segment_iterated++;
|
||||
auto seg_visitor = engine::SegmentVisitor::Build(ss, segment->GetID());
|
||||
segment::SegmentReaderPtr segment_reader =
|
||||
std::make_shared<segment::SegmentReader>(options_.meta_.path_, seg_visitor);
|
||||
|
@ -290,10 +292,15 @@ MemCollection::ApplyDeletes() {
|
|||
segment_iterator->Iterate();
|
||||
STATUS_CHECK(segment_iterator->GetStatus());
|
||||
|
||||
if (segment_iterated == 0) {
|
||||
return Status::OK(); // no segment, nothing to do
|
||||
}
|
||||
|
||||
fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", {
|
||||
std::srand(std::time(nullptr));
|
||||
sleep(std::rand() % 3);
|
||||
});
|
||||
|
||||
return segments_op->Push();
|
||||
}
|
||||
|
||||
|
|
|
@ -30,10 +30,10 @@ class MemManager {
|
|||
InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) = 0;
|
||||
|
||||
virtual Status
|
||||
DeleteEntity(int64_t collection_id, IDNumber vector_id, uint64_t lsn) = 0;
|
||||
DeleteEntity(int64_t collection_id, id_t entity_id, uint64_t lsn) = 0;
|
||||
|
||||
virtual Status
|
||||
DeleteEntities(int64_t collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0;
|
||||
DeleteEntities(int64_t collection_id, int64_t length, const id_t* entity_ids, uint64_t lsn) = 0;
|
||||
|
||||
virtual Status
|
||||
Flush(int64_t collection_id) = 0;
|
||||
|
|
|
@ -148,12 +148,12 @@ MemManagerImpl::InsertEntitiesNoLock(int64_t collection_id, int64_t partition_id
|
|||
}
|
||||
|
||||
Status
|
||||
MemManagerImpl::DeleteEntity(int64_t collection_id, IDNumber engity_id, uint64_t lsn) {
|
||||
MemManagerImpl::DeleteEntity(int64_t collection_id, id_t entity_id, uint64_t lsn) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
MemCollectionPtr mem = GetMemByCollection(collection_id);
|
||||
|
||||
mem->SetLSN(lsn);
|
||||
IDNumbers ids = {engity_id};
|
||||
IDNumbers ids = {entity_id};
|
||||
auto status = mem->Delete(ids);
|
||||
if (status.ok()) {
|
||||
return status;
|
||||
|
@ -163,7 +163,7 @@ MemManagerImpl::DeleteEntity(int64_t collection_id, IDNumber engity_id, uint64_t
|
|||
}
|
||||
|
||||
Status
|
||||
MemManagerImpl::DeleteEntities(int64_t collection_id, int64_t length, const IDNumber* engity_ids, uint64_t lsn) {
|
||||
MemManagerImpl::DeleteEntities(int64_t collection_id, int64_t length, const id_t* entity_ids, uint64_t lsn) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
MemCollectionPtr mem = GetMemByCollection(collection_id);
|
||||
|
||||
|
@ -171,7 +171,7 @@ MemManagerImpl::DeleteEntities(int64_t collection_id, int64_t length, const IDNu
|
|||
|
||||
IDNumbers ids;
|
||||
ids.resize(length);
|
||||
memcpy(ids.data(), engity_ids, length * sizeof(IDNumber));
|
||||
memcpy(ids.data(), entity_ids, length * sizeof(id_t));
|
||||
|
||||
auto status = mem->Delete(ids);
|
||||
if (!status.ok()) {
|
||||
|
|
|
@ -42,10 +42,10 @@ class MemManagerImpl : public MemManager {
|
|||
InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) override;
|
||||
|
||||
Status
|
||||
DeleteEntity(int64_t collection_id, IDNumber engity_ids, uint64_t lsn) override;
|
||||
DeleteEntity(int64_t collection_id, id_t entity_ids, uint64_t lsn) override;
|
||||
|
||||
Status
|
||||
DeleteEntities(int64_t collection_id, int64_t length, const IDNumber* engity_idss, uint64_t lsn) override;
|
||||
DeleteEntities(int64_t collection_id, int64_t length, const id_t* entity_ids, uint64_t lsn) override;
|
||||
|
||||
Status
|
||||
Flush(int64_t collection_id) override;
|
||||
|
|
|
@ -188,7 +188,7 @@ MXLogBuffer::SurplusSpace() {
|
|||
uint32_t
|
||||
MXLogBuffer::RecordSize(const MXLogRecord& record) {
|
||||
return SizeOfMXLogRecordHeader + (uint32_t)record.collection_id.size() + (uint32_t)record.partition_tag.size() +
|
||||
record.length * (uint32_t)sizeof(IDNumber) + record.data_size;
|
||||
record.length * (uint32_t)sizeof(id_t) + record.data_size;
|
||||
}
|
||||
|
||||
uint32_t
|
||||
|
@ -260,8 +260,8 @@ MXLogBuffer::Append(MXLogRecord& record) {
|
|||
current_write_offset += record.partition_tag.size();
|
||||
}
|
||||
if (record.ids != nullptr && record.length > 0) {
|
||||
memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(IDNumber));
|
||||
current_write_offset += record.length * sizeof(IDNumber);
|
||||
memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(id_t));
|
||||
current_write_offset += record.length * sizeof(id_t);
|
||||
}
|
||||
|
||||
if (record.data != nullptr && record.data_size > 0) {
|
||||
|
@ -353,8 +353,8 @@ MXLogBuffer::AppendEntity(milvus::engine::wal::MXLogRecord& record) {
|
|||
current_write_offset += record.partition_tag.size();
|
||||
}
|
||||
if (record.ids != nullptr && record.length > 0) {
|
||||
memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(IDNumber));
|
||||
current_write_offset += record.length * sizeof(IDNumber);
|
||||
memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(id_t));
|
||||
current_write_offset += record.length * sizeof(id_t);
|
||||
}
|
||||
|
||||
if (record.data != nullptr && record.data_size > 0) {
|
||||
|
@ -455,8 +455,8 @@ MXLogBuffer::Next(const uint64_t last_applied_lsn, MXLogRecord& record) {
|
|||
}
|
||||
|
||||
if (head->vector_num != 0) {
|
||||
record.ids = (IDNumber*)(current_read_buf + current_read_offset);
|
||||
current_read_offset += head->vector_num * sizeof(IDNumber);
|
||||
record.ids = (id_t*)(current_read_buf + current_read_offset);
|
||||
current_read_offset += head->vector_num * sizeof(id_t);
|
||||
} else {
|
||||
record.ids = nullptr;
|
||||
}
|
||||
|
@ -554,8 +554,8 @@ MXLogBuffer::NextEntity(const uint64_t last_applied_lsn, milvus::engine::wal::MX
|
|||
}
|
||||
|
||||
if (head->vector_num != 0) {
|
||||
record.ids = (IDNumber*)(current_read_buf + current_read_offset);
|
||||
current_read_offset += head->vector_num * sizeof(IDNumber);
|
||||
record.ids = (id_t*)(current_read_buf + current_read_offset);
|
||||
current_read_offset += head->vector_num * sizeof(id_t);
|
||||
} else {
|
||||
record.ids = nullptr;
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ struct MXLogRecord {
|
|||
std::string collection_id;
|
||||
std::string partition_tag;
|
||||
uint32_t length;
|
||||
const IDNumber* ids;
|
||||
const id_t* ids;
|
||||
uint32_t data_size;
|
||||
const void* data;
|
||||
std::vector<std::string> field_names; // will be removed
|
||||
|
|
|
@ -448,7 +448,7 @@ WalManager::Insert(const std::string& collection_id, const std::string& partitio
|
|||
return false;
|
||||
}
|
||||
size_t dim = vectors.size() / vector_num;
|
||||
size_t unit_size = dim * sizeof(T) + sizeof(IDNumber);
|
||||
size_t unit_size = dim * sizeof(T) + sizeof(id_t);
|
||||
size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length();
|
||||
|
||||
MXLogRecord record;
|
||||
|
@ -522,7 +522,7 @@ WalManager::InsertEntities(const std::string& collection_id, const std::string&
|
|||
attr_unit_size += attr_it->second;
|
||||
}
|
||||
|
||||
size_t unit_size = dim * sizeof(T) + sizeof(IDNumber) + attr_unit_size;
|
||||
size_t unit_size = dim * sizeof(T) + sizeof(id_t) + attr_unit_size;
|
||||
size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length();
|
||||
|
||||
// TODO(yukun): field_name put into MXLogRecord???
|
||||
|
@ -588,7 +588,7 @@ WalManager::DeleteById(const std::string& collection_id, const IDNumbers& entity
|
|||
return false;
|
||||
}
|
||||
|
||||
size_t unit_size = sizeof(IDNumber);
|
||||
size_t unit_size = sizeof(id_t);
|
||||
size_t head_size = SizeOfMXLogRecordHeader + collection_id.length();
|
||||
|
||||
MXLogRecord record;
|
||||
|
|
|
@ -144,7 +144,7 @@ Segment::AddChunk(const DataChunkPtr& chunk_ptr, int64_t from, int64_t to) {
|
|||
Status
|
||||
Segment::DeleteEntity(std::vector<offset_t>& offsets) {
|
||||
// sort offset in descendant
|
||||
std::sort(offsets.begin(), offsets.end(), std::less<offset_t>());
|
||||
std::sort(offsets.begin(), offsets.end(), std::greater<offset_t>());
|
||||
|
||||
// delete entity data
|
||||
for (auto& pair : fixed_fields_) {
|
||||
|
|
|
@ -637,13 +637,28 @@ TEST_F(DBTest, CompactTest) {
|
|||
status = db_->Insert(collection_name, "", data_chunk);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
status = db_->Flush();
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
milvus::engine::IDNumbers batch_entity_ids;
|
||||
milvus::engine::utils::GetIDFromChunk(data_chunk, batch_entity_ids);
|
||||
ASSERT_EQ(batch_entity_ids.size(), entity_count);
|
||||
|
||||
auto delete_entity = [&](int64_t from, int64_t to) -> void {
|
||||
int64_t delete_count = to - from;
|
||||
if (delete_count < 0) {
|
||||
return;
|
||||
}
|
||||
std::vector<milvus::engine::id_t> delete_ids;
|
||||
for (auto i = from; i < to; ++i) {
|
||||
delete_ids.push_back(batch_entity_ids[i]);
|
||||
}
|
||||
status = db_->DeleteEntityByID(collection_name, delete_ids);
|
||||
ASSERT_TRUE(status.ok());
|
||||
};
|
||||
int64_t delete_count_1 = 200;
|
||||
delete_entity(100, 100 + delete_count_1);
|
||||
|
||||
status = db_->Flush();
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
auto validate_entity_data = [&]() -> void {
|
||||
std::vector<std::string> field_names = {"field_0"};
|
||||
std::vector<bool> valid_row;
|
||||
|
@ -663,14 +678,8 @@ TEST_F(DBTest, CompactTest) {
|
|||
};
|
||||
validate_entity_data();
|
||||
|
||||
int64_t delete_count = 100;
|
||||
int64_t gap = entity_count / delete_count - 1;
|
||||
std::vector<milvus::engine::id_t> delete_ids;
|
||||
for (auto i = 1; i <= delete_count; ++i) {
|
||||
delete_ids.push_back(batch_entity_ids[i * gap]);
|
||||
}
|
||||
status = db_->DeleteEntityByID(collection_name, delete_ids);
|
||||
ASSERT_TRUE(status.ok());
|
||||
int64_t delete_count_2 = 100;
|
||||
delete_entity(700, 700 + delete_count_2);
|
||||
|
||||
status = db_->Flush();
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
@ -679,7 +688,7 @@ TEST_F(DBTest, CompactTest) {
|
|||
int64_t row_count = 0;
|
||||
status = db_->CountEntities(collection_name, row_count);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(row_count, entity_count - delete_count);
|
||||
ASSERT_EQ(row_count, entity_count - delete_count_1 - delete_count_2);
|
||||
|
||||
status = db_->Compact(dummy_context_, collection_name, threshold);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
@ -688,7 +697,7 @@ TEST_F(DBTest, CompactTest) {
|
|||
|
||||
status = db_->CountEntities(collection_name, row_count);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(row_count, entity_count - delete_count);
|
||||
ASSERT_EQ(row_count, entity_count - delete_count_1 - delete_count_2);
|
||||
|
||||
validate_entity_data();
|
||||
};
|
||||
|
@ -953,14 +962,8 @@ TEST_F(DBTest, DeleteEntitiesTest) {
|
|||
BuildEntities(count, batch_index, data_chunk);
|
||||
STATUS_CHECK(db_->Insert(collection, partition, data_chunk));
|
||||
STATUS_CHECK(db_->Flush(collection));
|
||||
auto iter = data_chunk->fixed_fields_.find(milvus::engine::DEFAULT_UID_NAME);
|
||||
if (iter == data_chunk->fixed_fields_.end()) {
|
||||
return Status(1, "Cannot find uid field");
|
||||
}
|
||||
auto& ids_buffer = iter->second;
|
||||
ids.resize(data_chunk->count_);
|
||||
memcpy(ids.data(), ids_buffer->data_.data(), ids_buffer->Size());
|
||||
|
||||
milvus::engine::utils::GetIDFromChunk(data_chunk, ids);
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue