enhance: mark duplicated pk as deleted (#34619)

pr: #34586

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
pull/34633/head
zhagnlu 2024-07-12 10:27:37 +08:00 committed by GitHub
parent 86b57b7827
commit 4e02e57044
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 477 additions and 290 deletions

View File

@ -17,106 +17,84 @@
#include <tuple>
#include <utility>
#include <vector>
#include <folly/ConcurrentSkipList.h>
#include "AckResponder.h"
#include "common/Schema.h"
#include "common/Types.h"
#include "segcore/Record.h"
#include "segcore/InsertRecord.h"
#include "ConcurrentVector.h"
namespace milvus::segcore {
struct DeletedRecord {
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
BitsetTypePtr bitmap_ptr;
struct Comparator {
bool
operator()(const std::pair<Timestamp, std::set<int64_t>>& left,
const std::pair<Timestamp, std::set<int64_t>>& right) const {
return left.first < right.first;
}
};
std::shared_ptr<TmpBitmap>
clone(int64_t capacity);
};
static constexpr int64_t deprecated_size_per_chunk = 32 * 1024;
DeletedRecord()
: lru_(std::make_shared<TmpBitmap>()),
timestamps_(deprecated_size_per_chunk),
pks_(deprecated_size_per_chunk) {
lru_->bitmap_ptr = std::make_shared<BitsetType>();
using TSkipList =
folly::ConcurrentSkipList<std::pair<Timestamp, std::set<int64_t>>,
Comparator>;
template <bool is_sealed = false>
class DeletedRecord {
public:
DeletedRecord(InsertRecord<is_sealed>* insert_record)
: insert_record_(insert_record),
deleted_pairs_(TSkipList::createInstance()) {
}
auto
get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
}
std::shared_ptr<TmpBitmap>
clone_lru_entry(int64_t insert_barrier,
int64_t del_barrier,
int64_t& old_del_barrier,
bool& hit_cache) {
std::shared_lock lck(shared_mutex_);
auto res = lru_->clone(insert_barrier);
old_del_barrier = lru_->del_barrier;
if (lru_->bitmap_ptr->size() == insert_barrier &&
lru_->del_barrier == del_barrier) {
hit_cache = true;
} else {
res->del_barrier = del_barrier;
}
return res;
}
DeletedRecord(DeletedRecord<is_sealed>&& delete_record) = delete;
DeletedRecord<is_sealed>&
operator=(DeletedRecord<is_sealed>&& delete_record) = delete;
void
insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
std::lock_guard lck(shared_mutex_);
if (new_entry->del_barrier <= lru_->del_barrier) {
if (!force ||
new_entry->bitmap_ptr->size() <= lru_->bitmap_ptr->size()) {
// DO NOTHING
return;
Push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::unique_lock<std::shared_mutex> lck(mutex_);
int64_t removed_num = 0;
int64_t mem_add = 0;
for (size_t i = 0; i < pks.size(); ++i) {
auto offsets = insert_record_->search_pk(pks[i], timestamps[i]);
for (auto offset : offsets) {
int64_t insert_row_offset = offset.get();
// Assert(insert_record->timestamps_.size() >= insert_row_offset);
if (insert_record_->timestamps_[insert_row_offset] <
timestamps[i]) {
InsertIntoInnerPairs(timestamps[i], {insert_row_offset});
removed_num++;
mem_add += sizeof(Timestamp) + sizeof(int64_t);
}
}
}
lru_ = std::move(new_entry);
n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
}
void
push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::lock_guard lck(buffer_mutex_);
auto size = pks.size();
ssize_t divide_point = 0;
auto n = n_.load();
// Truncate the overlapping prefix
if (n > 0) {
auto last = timestamps_[n - 1];
divide_point =
std::lower_bound(timestamps, timestamps + size, last + 1) -
timestamps;
}
// All these delete records have been applied
if (divide_point == size) {
Query(BitsetType& bitset, int64_t insert_barrier, Timestamp timestamp) {
Assert(bitset.size() == insert_barrier);
// TODO: add cache to bitset
if (deleted_pairs_.size() == 0) {
return;
}
auto end = deleted_pairs_.lower_bound(
std::make_pair(timestamp, std::set<int64_t>{}));
for (auto it = deleted_pairs_.begin(); it != end; it++) {
for (auto& v : it->second) {
bitset.set(v);
}
}
size -= divide_point;
pks_.set_data_raw(n, pks.data() + divide_point, size);
timestamps_.set_data_raw(n, timestamps + divide_point, size);
n_ += size;
mem_size_ += sizeof(Timestamp) * size +
CalcPksSize(pks.data() + divide_point, size);
}
const ConcurrentVector<Timestamp>&
timestamps() const {
return timestamps_;
}
const ConcurrentVector<PkType>&
pks() const {
return pks_;
// handle the case where end points to an element with the same timestamp
if (end != deleted_pairs_.end() && end->first == timestamp) {
for (auto& v : end->second) {
bitset.set(v);
}
}
}
int64_t
@ -130,26 +108,24 @@ struct DeletedRecord {
}
private:
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
void
InsertIntoInnerPairs(Timestamp ts, std::set<int64_t> offsets) {
auto it = deleted_pairs_.find(std::make_pair(ts, std::set<int64_t>{}));
if (it == deleted_pairs_.end()) {
deleted_pairs_.insert(std::make_pair(ts, offsets));
} else {
for (auto& val : offsets) {
it->second.insert(val);
}
}
}
std::shared_mutex buffer_mutex_;
private:
std::shared_mutex mutex_;
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<PkType> pks_;
InsertRecord<is_sealed>* insert_record_;
TSkipList::Accessor deleted_pairs_;
};
inline auto
DeletedRecord::TmpBitmap::clone(int64_t capacity)
-> std::shared_ptr<TmpBitmap> {
auto res = std::make_shared<TmpBitmap>();
res->del_barrier = this->del_barrier;
// res->bitmap_ptr = std::make_shared<BitsetType>();
// *(res->bitmap_ptr) = *(this->bitmap_ptr);
res->bitmap_ptr = std::make_shared<BitsetType>(this->bitmap_ptr->clone());
res->bitmap_ptr->resize(capacity, false);
return res;
}
} // namespace milvus::segcore

View File

@ -68,6 +68,12 @@ class OffsetMap {
virtual void
clear() = 0;
virtual std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;
virtual void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;
};
template <typename T>
@ -96,6 +102,57 @@ class OffsetOrderedMap : public OffsetMap {
map_[std::get<T>(pk)].emplace_back(offset);
}
std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::shared_lock<std::shared_mutex> lck(mtx_);
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;
for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}
remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[max_timestamp_offset]);
}
}
return std::make_pair(remove_pks, remove_timestamps);
}
void
remove_duplicate_pks(
const ConcurrentVector<Timestamp>& timestamps) override {
std::unique_lock<std::shared_mutex> lck(mtx_);
for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}
// remove other offsets from pk index
offsets.erase(
std::remove_if(offsets.begin(),
offsets.end(),
[max_timestamp_offset](int64_t val) {
return val != max_timestamp_offset;
}),
offsets.end());
}
}
}
void
seal() override {
PanicInfo(
@ -219,6 +276,63 @@ class OffsetOrderedArray : public OffsetMap {
std::make_pair(std::get<T>(pk), static_cast<int32_t>(offset)));
}
std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;
// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}
// return max_timestamps that removed pks
for (auto& pk : need_removed_pks) {
remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[pks[pk]]);
}
return std::make_pair(remove_pks, remove_timestamps);
}
void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) {
// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}
// remove duplicate pks
for (auto it = array_.begin(); it != array_.end();) {
const T& key = it->first;
auto max_offset = pks[key];
if (max_offset != it->second) {
it = array_.erase(it);
} else {
it++;
}
}
}
void
seal() override {
sort(array_.begin(), array_.end());
@ -520,6 +634,26 @@ struct InsertRecord {
pk2offset_->insert(pk, offset);
}
bool
insert_with_check_existence(const PkType& pk, int64_t offset) {
std::lock_guard lck(shared_mutex_);
auto exist = pk2offset_->contain(pk);
pk2offset_->insert(pk, offset);
return exist;
}
std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks() {
std::lock_guard lck(shared_mutex_);
return pk2offset_->get_need_removed_pks(timestamps_);
}
void
remove_duplicate_pks() {
std::lock_guard lck(shared_mutex_);
pk2offset_->remove_duplicate_pks(timestamps_);
}
bool
empty_pks() const {
std::shared_lock lck(shared_mutex_);

View File

@ -39,6 +39,8 @@ class SegmentGrowing : public SegmentInternalInterface {
return SegmentType::Growing;
}
virtual std::vector<SegOffset>
SearchPk(const PkType& pk, Timestamp ts) const = 0;
// virtual int64_t
// PreDelete(int64_t size) = 0;

View File

@ -48,23 +48,7 @@ void
SegmentGrowingImpl::mask_with_delete(BitsetType& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}
void
@ -159,7 +143,14 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
ParsePksFromFieldData(
pks, insert_record_proto->fields_data(field_id_to_offset[field_id]));
for (int i = 0; i < num_rows; ++i) {
insert_record_.insert_pk(pks[i], reserved_offset + i);
auto exist_pk = insert_record_.insert_with_check_existence(
pks[i], reserved_offset + i);
// if pk exist duplicate record, remove last pk under current insert timestamp
// means last pk is invisibale for current insert timestamp
if (exist_pk) {
auto remove_timestamp = timestamps_raw[i];
deleted_record_.Push({pks[i]}, &remove_timestamp);
}
}
// step 5: update small indexes
@ -167,6 +158,18 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
reserved_offset + num_rows);
}
void
SegmentGrowingImpl::RemoveDuplicatePkRecords() {
std::unique_lock lck(mutex_);
//Assert(!insert_record_.timestamps_.empty());
// firstly find that need removed records and mark them as deleted
auto removed_pks = insert_record_.get_need_removed_pks();
deleted_record_.Push(removed_pks.first, removed_pks.second.data());
// then remove duplicated pks in pk index
insert_record_.remove_duplicate_pks();
}
void
SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
// schema don't include system field
@ -391,7 +394,7 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
}
// step 2: fill delete record
deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.Push(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}
@ -412,7 +415,7 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
// step 2: fill pks and timestamps
deleted_record_.push(pks, timestamps);
deleted_record_.Push(pks, timestamps);
}
SpanBase

View File

@ -67,6 +67,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
LoadFieldDataV2(const LoadFieldDataInfo& info) override;
void
RemoveDuplicatePkRecords() override;
std::string
debug() const override;
@ -86,7 +89,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
return indexing_record_;
}
const DeletedRecord&
const DeletedRecord<false>&
get_deleted_record() const {
return deleted_record_;
}
@ -129,6 +132,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
try_remove_chunks(FieldId fieldId);
std::vector<SegOffset>
SearchPk(const PkType& pk, Timestamp ts) const {
return insert_record_.search_pk(pk, ts);
}
public:
size_t
GetMemoryUsageInBytes() const override {
@ -221,6 +229,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
index_meta_(indexMeta),
insert_record_(
*schema_, segcore_config.get_chunk_rows(), mmap_descriptor_),
deleted_record_(&insert_record_),
indexing_record_(*schema_, index_meta_, segcore_config_),
id_(segment_id) {
if (mmap_descriptor_ != nullptr) {
@ -329,6 +338,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
Assert(plan);
}
void
check_retrieve(const query::RetrievePlan* plan) const override {
Assert(plan);
}
const ConcurrentVector<Timestamp>&
get_timestamps() const override {
return insert_record_.timestamps_;
@ -349,7 +363,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
mutable std::shared_mutex chunk_mutex_;
// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<false> deleted_record_;
int64_t id_;

View File

@ -87,6 +87,7 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
bool ignore_non_pk) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("Retrieve", trace_ctx, false);
check_retrieve(plan);
auto results = std::make_unique<proto::segcore::RetrieveResults>();
query::ExecPlanNodeVisitor visitor(*this, timestamp);
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
@ -220,6 +221,7 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
int64_t size) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("RetrieveByOffsets", trace_ctx, false);
check_retrieve(Plan);
auto results = std::make_unique<proto::segcore::RetrieveResults>();
FillTargetEntry(trace_ctx, Plan, results, offsets, size, false, false);
return results;

View File

@ -118,6 +118,9 @@ class SegmentInterface {
virtual void
LoadFieldDataV2(const LoadFieldDataInfo& info) = 0;
virtual void
RemoveDuplicatePkRecords() = 0;
virtual int64_t
get_segment_id() const = 0;
@ -387,6 +390,9 @@ class SegmentInternalInterface : public SegmentInterface {
virtual void
check_search(const query::Plan* plan) const = 0;
virtual void
check_retrieve(const query::RetrievePlan* plan) const = 0;
virtual const ConcurrentVector<Timestamp>&
get_timestamps() const = 0;

View File

@ -47,6 +47,9 @@ class SegmentSealed : public SegmentInternalInterface {
type() const override {
return SegmentType::Sealed;
}
virtual std::vector<SegOffset>
SearchPk(const PkType& pk, Timestamp ts) const = 0;
};
using SegmentSealedSPtr = std::shared_ptr<SegmentSealed>;

View File

@ -330,6 +330,23 @@ SegmentSealedImpl::LoadFieldDataV2(const LoadFieldDataInfo& load_info) {
field_id.get());
}
}
void
SegmentSealedImpl::RemoveDuplicatePkRecords() {
std::unique_lock lck(mutex_);
if (!is_pk_index_valid_) {
// Assert(!insert_record_.timestamps_.empty());
// firstly find that need removed records and mark them as deleted
auto removed_pks = insert_record_.get_need_removed_pks();
deleted_record_.Push(removed_pks.first, removed_pks.second.data());
// then remove duplicated pks in pk index
insert_record_.remove_duplicate_pks();
insert_record_.seal_pks();
is_pk_index_valid_ = true;
}
}
void
SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
auto num_rows = data.row_count;
@ -623,7 +640,7 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
// step 2: fill pks and timestamps
deleted_record_.push(pks, timestamps);
deleted_record_.Push(pks, timestamps);
}
void
@ -742,24 +759,7 @@ void
SegmentSealedImpl::mask_with_delete(BitsetType& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}
void
@ -1057,6 +1057,7 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
binlog_index_bitset_(schema->size()),
scalar_indexings_(schema->size()),
insert_record_(*schema, MAX_ROW_COUNT),
deleted_record_(&insert_record_),
schema_(schema),
id_(segment_id),
col_index_meta_(index_meta),
@ -1553,7 +1554,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
sort_pks[i] = pk;
}
deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.Push(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

View File

@ -28,6 +28,7 @@
#include "SegmentSealed.h"
#include "TimestampIndex.h"
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "google/protobuf/message_lite.h"
#include "mmap/Column.h"
#include "index/ScalarIndex.h"
@ -51,6 +52,9 @@ class SegmentSealedImpl : public SegmentSealed {
LoadFieldData(const LoadFieldDataInfo& info) override;
void
LoadFieldDataV2(const LoadFieldDataInfo& info) override;
// erase duplicate records when sealed segment loaded done
void
RemoveDuplicatePkRecords() override;
void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;
void
@ -110,6 +114,11 @@ class SegmentSealedImpl : public SegmentSealed {
std::unique_ptr<DataArray>
get_vector(FieldId field_id, const int64_t* ids, int64_t count) const;
std::vector<SegOffset>
SearchPk(const PkType& pk, Timestamp ts) const {
return insert_record_.search_pk(pk, ts);
}
public:
int64_t
num_chunk_index(FieldId field_id) const override;
@ -183,6 +192,11 @@ class SegmentSealedImpl : public SegmentSealed {
void
check_search(const query::Plan* plan) const override;
void
check_retrieve(const query::RetrievePlan* plan) const override {
Assert(plan);
}
int64_t
get_active_count(Timestamp ts) const override;
@ -267,7 +281,7 @@ class SegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}
const DeletedRecord&
const DeletedRecord<true>&
get_deleted_record() const {
return deleted_record_;
}
@ -312,7 +326,7 @@ class SegmentSealedImpl : public SegmentSealed {
InsertRecord<true> insert_record_;
// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<true> deleted_record_;
LoadFieldDataInfo field_data_info_;
@ -332,6 +346,9 @@ class SegmentSealedImpl : public SegmentSealed {
// for sparse vector unit test only! Once a type of sparse index that
// doesn't has raw data is added, this should be removed.
bool TEST_skip_index_for_retrieve_ = false;
// for pk index, when loaded done, need to compact to erase duplicate records
bool is_pk_index_valid_ = false;
};
inline SegmentSealedUPtr

View File

@ -107,76 +107,6 @@ std::unique_ptr<DataArray>
MergeDataArray(std::vector<MergeBase>& merge_bases,
const FieldMeta& field_meta);
template <bool is_sealed>
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}
auto bitmap = current->bitmap_ptr;
int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}
// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];
delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}
for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();
// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record.timestamps_[insert_row_offset] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}
delete_record.insert_lru_entry(current);
return current;
}
std::unique_ptr<DataArray>
ReverseDataFromIndex(const index::IndexBase* index,
const int64_t* seg_offsets,

View File

@ -326,6 +326,19 @@ LoadFieldData(CSegmentInterface c_segment,
}
}
CStatus
RemoveDuplicatePkRecords(CSegmentInterface c_segment) {
try {
auto segment =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
AssertInfo(segment != nullptr, "segment conversion failed");
segment->RemoveDuplicatePkRecords();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
CStatus
LoadFieldDataV2(CSegmentInterface c_segment,
CLoadFieldDataInfo c_load_field_data_info) {

View File

@ -106,6 +106,9 @@ CStatus
LoadFieldDataV2(CSegmentInterface c_segment,
CLoadFieldDataInfo load_field_data_info);
CStatus
RemoveDuplicatePkRecords(CSegmentInterface c_segment);
CStatus
LoadFieldRawData(CSegmentInterface c_segment,
int64_t field_id,

View File

@ -359,4 +359,4 @@ TEST_P(TaskTest, CompileInputs_or_with_and) {
"PhyUnaryRangeFilterExpr");
}
}
}
}

View File

@ -6667,4 +6667,4 @@ TEST_P(ExprTest, TestJsonContainsDiffType) {
ASSERT_EQ(ans, testcase.res);
}
}
}
}

View File

@ -49,6 +49,66 @@ TEST(Growing, DeleteCount) {
ASSERT_EQ(cnt, c);
}
TEST(Growing, RemoveDuplicatedRecords) {
{
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
int64_t c = 1000;
auto offset = 0;
auto dataset = DataGen(schema, c, 42, 0, 1, 10, true);
auto pks = dataset.get_col<int64_t>(pk);
segment->Insert(offset,
c,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
BitsetType bits(c);
std::map<int64_t, std::vector<int64_t>> different_pks;
for (int i = 0; i < pks.size(); i++) {
if (different_pks.find(pks[i]) != different_pks.end()) {
different_pks[pks[i]].push_back(i);
} else {
different_pks[pks[i]] = {i};
}
}
for (auto& [k, v] : different_pks) {
if (v.size() > 1) {
for (int i = 0; i < v.size() - 1; i++) {
bits.set(v[i]);
}
}
}
BitsetType bitset(c);
std::cout << "start to search delete" << std::endl;
segment->mask_with_delete(bitset, c, 1003);
for (int i = 0; i < bitset.size(); i++) {
ASSERT_EQ(bitset[i], bits[i]) << "index:" << i << std::endl;
}
for (auto& [k, v] : different_pks) {
//std::cout << "k:" << k << "v:" << join(v, ",") << std::endl;
auto res = segment->SearchPk(k, Timestamp(1003));
ASSERT_EQ(res.size(), v.size());
}
segment->RemoveDuplicatePkRecords();
for (auto& [k, v] : different_pks) {
//std::cout << "k:" << k << "v:" << join(v, ",") << std::endl;
auto res = segment->SearchPk(k, Timestamp(1003));
ASSERT_EQ(res.size(), 1);
}
}
}
TEST(Growing, RealCount) {
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);

View File

@ -1064,6 +1064,7 @@ TEST(Sealed, OverlapDelete) {
LoadDeletedRecordInfo info = {timestamps.data(), ids.get(), row_count};
segment->LoadDeletedRecord(info);
auto deleted_record1 = pks.size();
ASSERT_EQ(segment->get_deleted_count(), pks.size())
<< "deleted_count=" << segment->get_deleted_count()
<< " pks_count=" << pks.size() << std::endl;
@ -1079,10 +1080,10 @@ TEST(Sealed, OverlapDelete) {
segment->LoadDeletedRecord(overlap_info);
BitsetType bitset(N, false);
// NOTE: need to change delete timestamp, so not to hit the cache
ASSERT_EQ(segment->get_deleted_count(), pks.size())
auto deleted_record2 = pks.size();
ASSERT_EQ(segment->get_deleted_count(), deleted_record1 + deleted_record2)
<< "deleted_count=" << segment->get_deleted_count()
<< " pks_count=" << pks.size() << std::endl;
<< " pks_count=" << deleted_record1 + deleted_record2 << std::endl;
segment->mask_with_delete(bitset, 10, 12);
ASSERT_EQ(bitset.count(), pks.size())
<< "bitset_count=" << bitset.count() << " pks_count=" << pks.size()
@ -1231,6 +1232,63 @@ TEST(Sealed, BF_Overflow) {
}
}
TEST(Sealed, DeleteDuplicatedRecords) {
{
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateSealedSegment(schema);
auto offset = segment->get_deleted_count();
ASSERT_EQ(offset, 0);
int64_t c = 1000;
// generate random pk that may have dupicated records
auto dataset = DataGen(schema, c, 42, 0, 1, 10, true);
auto pks = dataset.get_col<int64_t>(pk);
// current insert record: { pk: random(0 - 999) timestamp: (0 - 999) }
SealedLoadFieldData(dataset, *segment);
segment->RemoveDuplicatePkRecords();
BitsetType bits(c);
std::map<int64_t, std::vector<int64_t>> different_pks;
for (int i = 0; i < pks.size(); i++) {
if (different_pks.find(pks[i]) != different_pks.end()) {
different_pks[pks[i]].push_back(i);
} else {
different_pks[pks[i]] = {i};
}
}
for (auto& [k, v] : different_pks) {
if (v.size() > 1) {
for (int i = 0; i < v.size() - 1; i++) {
bits.set(v[i]);
}
}
}
ASSERT_EQ(segment->get_deleted_count(), c - different_pks.size())
<< "deleted_count=" << segment->get_deleted_count()
<< "duplicate_pks " << c - different_pks.size() << std::endl;
BitsetType bitset(c);
std::cout << "start to search delete" << std::endl;
segment->mask_with_delete(bitset, c, 1003);
for (int i = 0; i < bitset.size(); i++) {
ASSERT_EQ(bitset[i], bits[i]) << "index:" << i << std::endl;
}
for (auto& [k, v] : different_pks) {
//std::cout << "k:" << k << "v:" << join(v, ",") << std::endl;
auto res = segment->SearchPk(k, Timestamp(1003));
ASSERT_EQ(res.size(), 1);
}
}
}
TEST(Sealed, DeleteCount) {
{
auto schema = std::make_shared<Schema>();
@ -1238,14 +1296,17 @@ TEST(Sealed, DeleteCount) {
schema->set_primary_field_id(pk);
auto segment = CreateSealedSegment(schema);
int64_t c = 10;
auto offset = segment->get_deleted_count();
ASSERT_EQ(offset, 0);
int64_t c = 10;
auto dataset = DataGen(schema, c);
auto pks = dataset.get_col<int64_t>(pk);
SealedLoadFieldData(dataset, *segment);
Timestamp begin_ts = 100;
auto tss = GenTss(c, begin_ts);
auto pks = GenPKs(c, 0);
auto status = segment->Delete(offset, c, pks.get(), tss.data());
auto delete_pks = GenPKs(c, 0);
auto status = segment->Delete(offset, c, delete_pks.get(), tss.data());
ASSERT_TRUE(status.ok());
// shouldn't be filtered for empty segment.

View File

@ -51,71 +51,6 @@ TEST(Util, StringMatch) {
ASSERT_FALSE(PostfixMatch("dontmatch", "postfix"));
}
TEST(Util, GetDeleteBitmap) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 10;
uint64_t seg_id = 101;
InsertRecord insert_record(*schema, N);
DeletedRecord delete_record;
// fill insert record, all insert records has same pk = 1, timestamps= {1 ... N}
std::vector<int64_t> age_data(N);
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
age_data[i] = 1;
tss[i] = i + 1;
insert_record.insert_pk(1, i);
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N);
auto field_data = insert_record.get_field_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
// test case delete pk1(ts = 0) -> insert repeated pk1 (ts = {1 ... N}) -> query (ts = N)
std::vector<Timestamp> delete_ts = {0};
std::vector<PkType> delete_pk = {1};
delete_record.push(delete_pk, delete_ts.data());
auto query_timestamp = tss[N - 1];
auto del_barrier = get_barrier(delete_record, query_timestamp);
auto insert_barrier = get_barrier(insert_record, query_timestamp);
auto res_bitmap = get_deleted_bitmap(del_barrier,
insert_barrier,
delete_record,
insert_record,
query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N)
delete_ts = {uint64_t(N)};
delete_pk = {1};
delete_record.push(delete_pk, delete_ts.data());
del_barrier = get_barrier(delete_record, query_timestamp);
res_bitmap = get_deleted_bitmap(del_barrier,
insert_barrier,
delete_record,
insert_record,
query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), N - 1);
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N/2)
query_timestamp = tss[N - 1] / 2;
del_barrier = get_barrier(delete_record, query_timestamp);
res_bitmap = get_deleted_bitmap(
del_barrier, N, delete_record, insert_record, query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
}
TEST(Util, OutOfRange) {
using milvus::query::out_of_range;

View File

@ -289,13 +289,14 @@ GenerateRandomSparseFloatVector(size_t rows,
return tensor;
}
inline GeneratedData DataGen(SchemaPtr schema,
int64_t N,
uint64_t seed = 42,
uint64_t ts_offset = 0,
int repeat_count = 1,
int array_len = 10,
bool random_pk = false) {
inline GeneratedData
DataGen(SchemaPtr schema,
int64_t N,
uint64_t seed = 42,
uint64_t ts_offset = 0,
int repeat_count = 1,
int array_len = 10,
bool random_pk = false) {
using std::vector;
std::default_random_engine random(seed);
std::normal_distribution<> distr(0, 1);
@ -392,7 +393,7 @@ inline GeneratedData DataGen(SchemaPtr schema,
for (int i = 0; i < N; i++) {
if (random_pk && schema->get_primary_field_id()->get() ==
field_id.get()) {
data[i] = random();
data[i] = random() % N;
} else {
data[i] = i / repeat_count;
}

View File

@ -953,6 +953,18 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
return err
}
GetDynamicPool().Submit(func() (any, error) {
status = C.RemoveDuplicatePkRecords(s.ptr)
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "RemoveDuplicatePkRecords failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.Type().String())); err != nil {
return err
}
log.Info("load mutil field done",
zap.Int64("row count", rowCount),
zap.Int64("segmentID", s.ID()))

View File

@ -20,6 +20,7 @@ package segments
#cgo pkg-config: milvus_segcore
#include "segcore/load_index_c.h"
#include "segcore/segment_c.h"
*/
import "C"
@ -1220,6 +1221,19 @@ func loadSealedSegmentFields(ctx context.Context, collection *Collection, segmen
return err
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
status = C.RemoveDuplicatePkRecords(segment.ptr)
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "RemoveDuplicatePkRecords failed",
zap.Int64("collectionID", segment.Collection()),
zap.Int64("segmentID", segment.ID()),
zap.String("segmentType", segment.Type().String())); err != nil {
return err
}
log.Ctx(ctx).Info("load field binlogs done for sealed segment",
zap.Int64("collection", segment.Collection()),
zap.Int64("segment", segment.ID()),