mirror of https://github.com/milvus-io/milvus.git
enhance: Filter out non-hit delete records during load delta (#36207)
Related to #35303 This PR utilizes pk index in segment to exclude non-hit delete record during load delete records. This ability is crucial when l0/delete forward policy only replies on segment itself(without BF filtering). --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/36267/head
parent
f7d950d465
commit
58d3200986
|
@ -353,8 +353,38 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
|
|||
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
|
||||
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
|
||||
|
||||
// step 2: fill pks and timestamps
|
||||
deleted_record_.push(pks, timestamps);
|
||||
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
|
||||
}
|
||||
|
||||
if (!insert_record_.empty_pks()) {
|
||||
auto end = std::remove_if(
|
||||
ordering.begin(),
|
||||
ordering.end(),
|
||||
[&](const std::tuple<Timestamp, PkType>& record) {
|
||||
return !insert_record_.contain(std::get<1>(record));
|
||||
});
|
||||
size = end - ordering.begin();
|
||||
ordering.resize(size);
|
||||
}
|
||||
|
||||
// all record filtered
|
||||
if (size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::sort(ordering.begin(), ordering.end());
|
||||
std::vector<PkType> sort_pks(size);
|
||||
std::vector<Timestamp> sort_timestamps(size);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
auto [t, pk] = ordering[i];
|
||||
sort_timestamps[i] = t;
|
||||
sort_pks[i] = pk;
|
||||
}
|
||||
|
||||
deleted_record_.push(sort_pks, sort_timestamps.data());
|
||||
}
|
||||
|
||||
SpanBase
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include <memory>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <boost/pointer_cast.hpp>
|
||||
|
@ -606,8 +607,38 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
|
|||
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
|
||||
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
|
||||
|
||||
// step 2: fill pks and timestamps
|
||||
deleted_record_.push(pks, timestamps);
|
||||
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
|
||||
}
|
||||
|
||||
if (!insert_record_.empty_pks()) {
|
||||
auto end = std::remove_if(
|
||||
ordering.begin(),
|
||||
ordering.end(),
|
||||
[&](const std::tuple<Timestamp, PkType>& record) {
|
||||
return !insert_record_.contain(std::get<1>(record));
|
||||
});
|
||||
size = end - ordering.begin();
|
||||
ordering.resize(size);
|
||||
}
|
||||
|
||||
// all record filtered
|
||||
if (size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::sort(ordering.begin(), ordering.end());
|
||||
std::vector<PkType> sort_pks(size);
|
||||
std::vector<Timestamp> sort_timestamps(size);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
auto [t, pk] = ordering[i];
|
||||
sort_timestamps[i] = t;
|
||||
sort_pks[i] = pk;
|
||||
}
|
||||
|
||||
deleted_record_.push(sort_pks, sort_timestamps.data());
|
||||
}
|
||||
|
||||
void
|
||||
|
|
Loading…
Reference in New Issue