mirror of https://github.com/milvus-io/milvus.git
Fix query too slow when insert multi repeated pk data (#18231)
Signed-off-by: xige-16 <xi.ge@zilliz.com>querycoordv2_refactor
parent
e5fe46124b
commit
54d17bc5da
|
@ -22,6 +22,7 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
#include <tbb/concurrent_unordered_map.h>
|
||||
#include <tbb/concurrent_unordered_set.h>
|
||||
#include <boost/align/aligned_allocator.hpp>
|
||||
#include <boost/container/vector.hpp>
|
||||
#include <boost/dynamic_bitset.hpp>
|
||||
|
@ -70,7 +71,9 @@ using VectorArray = proto::schema::VectorField;
|
|||
using IdArray = proto::schema::IDs;
|
||||
using InsertData = proto::segcore::InsertRecord;
|
||||
using PkType = std::variant<std::monostate, int64_t, std::string>;
|
||||
using Pk2OffsetType = tbb::concurrent_unordered_multimap<PkType, int64_t, std::hash<PkType>>;
|
||||
// tbb::concurrent_unordered_multimap equal_range too slow when multi repeated key
|
||||
// using Pk2OffsetType = tbb::concurrent_unordered_multimap<PkType, int64_t, std::hash<PkType>>;
|
||||
using Pk2OffsetType = tbb::concurrent_unordered_map<PkType, tbb::concurrent_unordered_set<int64_t>, std::hash<PkType>>;
|
||||
|
||||
inline bool
|
||||
IsPrimaryKeyDataType(DataType data_type) {
|
||||
|
|
|
@ -42,6 +42,21 @@ struct DeletedRecord {
|
|||
return lru_;
|
||||
}
|
||||
|
||||
std::shared_ptr<TmpBitmap>
|
||||
clone_lru_entry(int64_t insert_barrier, int64_t del_barrier, int64_t& old_del_barrier, bool& hit_cache) {
|
||||
std::shared_lock lck(shared_mutex_);
|
||||
auto res = lru_->clone(insert_barrier);
|
||||
old_del_barrier = lru_->del_barrier;
|
||||
|
||||
if (lru_->bitmap_ptr->size() == insert_barrier && lru_->del_barrier == del_barrier) {
|
||||
hit_cache = true;
|
||||
} else {
|
||||
res->del_barrier = del_barrier;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void
|
||||
insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
|
||||
std::lock_guard lck(shared_mutex_);
|
||||
|
@ -59,7 +74,6 @@ struct DeletedRecord {
|
|||
AckResponder ack_responder_;
|
||||
ConcurrentVector<Timestamp> timestamps_;
|
||||
ConcurrentVector<PkType> pks_;
|
||||
int64_t record_size_ = 0;
|
||||
|
||||
private:
|
||||
std::shared_ptr<TmpBitmap> lru_;
|
||||
|
|
|
@ -43,11 +43,12 @@ struct InsertRecord {
|
|||
std::vector<SegOffset>
|
||||
search_pk(const PkType pk, Timestamp timestamp) const {
|
||||
std::vector<SegOffset> res_offsets;
|
||||
auto [iter_b, iter_e] = pk2offset_.equal_range(pk);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = SegOffset(iter->second);
|
||||
if (timestamps_[offset.get()] <= timestamp) {
|
||||
res_offsets.push_back(offset);
|
||||
auto offset_iter = pk2offset_.find(pk);
|
||||
if (offset_iter != pk2offset_.end()) {
|
||||
for (auto offset : offset_iter->second) {
|
||||
if (timestamps_[offset] <= timestamp) {
|
||||
res_offsets.push_back(SegOffset(offset));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -57,11 +58,12 @@ struct InsertRecord {
|
|||
std::vector<SegOffset>
|
||||
search_pk(const PkType pk, int64_t insert_barrier) const {
|
||||
std::vector<SegOffset> res_offsets;
|
||||
auto [iter_b, iter_e] = pk2offset_.equal_range(pk);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = SegOffset(iter->second);
|
||||
if (offset.get() < insert_barrier) {
|
||||
res_offsets.push_back(offset);
|
||||
auto offset_iter = pk2offset_.find(pk);
|
||||
if (offset_iter != pk2offset_.end()) {
|
||||
for (auto offset : offset_iter->second) {
|
||||
if (offset < insert_barrier) {
|
||||
res_offsets.push_back(SegOffset(offset));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,7 +72,7 @@ struct InsertRecord {
|
|||
|
||||
void
|
||||
insert_pk(const PkType pk, int64_t offset) {
|
||||
pk2offset_.insert(std::make_pair(pk, offset));
|
||||
pk2offset_[pk].insert(offset);
|
||||
}
|
||||
|
||||
bool
|
||||
|
|
|
@ -157,11 +157,10 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
|
|||
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
|
||||
|
||||
// step 2: fill pks and timestamps
|
||||
deleted_record_.pks_.set_data_raw(0, pks.data(), size);
|
||||
deleted_record_.timestamps_.set_data_raw(0, timestamps, size);
|
||||
deleted_record_.ack_responder_.AddSegment(0, size);
|
||||
deleted_record_.reserved.fetch_add(size);
|
||||
deleted_record_.record_size_ = size;
|
||||
auto reserved_begin = deleted_record_.reserved.fetch_add(size);
|
||||
deleted_record_.pks_.set_data_raw(reserved_begin, pks.data(), size);
|
||||
deleted_record_.timestamps_.set_data_raw(reserved_begin, timestamps, size);
|
||||
deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
|
||||
}
|
||||
|
||||
SpanBase
|
||||
|
|
|
@ -64,6 +64,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
|||
std::string
|
||||
debug() const override;
|
||||
|
||||
int64_t
|
||||
get_segment_id() const override {
|
||||
return id_;
|
||||
}
|
||||
|
||||
public:
|
||||
const InsertRecord&
|
||||
get_insert_record() const {
|
||||
|
|
|
@ -69,6 +69,9 @@ class SegmentInterface {
|
|||
|
||||
virtual void
|
||||
LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0;
|
||||
|
||||
virtual int64_t
|
||||
get_segment_id() const = 0;
|
||||
};
|
||||
|
||||
// internal API for DSL calculation
|
||||
|
|
|
@ -254,11 +254,10 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
|
|||
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
|
||||
|
||||
// step 2: fill pks and timestamps
|
||||
deleted_record_.pks_.set_data_raw(0, pks.data(), size);
|
||||
deleted_record_.timestamps_.set_data_raw(0, timestamps, size);
|
||||
deleted_record_.ack_responder_.AddSegment(0, size);
|
||||
deleted_record_.reserved.fetch_add(size);
|
||||
deleted_record_.record_size_ = size;
|
||||
auto reserved_begin = deleted_record_.reserved.fetch_add(size);
|
||||
deleted_record_.pks_.set_data_raw(reserved_begin, pks.data(), size);
|
||||
deleted_record_.timestamps_.set_data_raw(reserved_begin, timestamps, size);
|
||||
deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
|
||||
}
|
||||
|
||||
// internal API: support scalar index only
|
||||
|
|
|
@ -50,6 +50,11 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||
bool
|
||||
HasFieldData(FieldId field_id) const override;
|
||||
|
||||
int64_t
|
||||
get_segment_id() const override {
|
||||
return id_;
|
||||
}
|
||||
|
||||
public:
|
||||
int64_t
|
||||
GetMemoryUsageInBytes() const override;
|
||||
|
|
|
@ -380,37 +380,43 @@ get_deleted_bitmap(int64_t del_barrier,
|
|||
DeletedRecord& delete_record,
|
||||
const InsertRecord& insert_record,
|
||||
Timestamp query_timestamp) {
|
||||
auto old = delete_record.get_lru_entry();
|
||||
// if insert_barrier and del_barrier have not changed, use cache data directly
|
||||
if (old->bitmap_ptr->size() == insert_barrier) {
|
||||
if (old->del_barrier == del_barrier) {
|
||||
return old;
|
||||
}
|
||||
bool hit_cache = false;
|
||||
int64_t old_del_barrier = 0;
|
||||
auto current = delete_record.clone_lru_entry(insert_barrier, del_barrier, old_del_barrier, hit_cache);
|
||||
if (hit_cache) {
|
||||
return current;
|
||||
}
|
||||
|
||||
auto current = old->clone(insert_barrier);
|
||||
current->del_barrier = del_barrier;
|
||||
|
||||
auto bitmap = current->bitmap_ptr;
|
||||
|
||||
int64_t start, end;
|
||||
if (del_barrier < old->del_barrier) {
|
||||
if (del_barrier < old_del_barrier) {
|
||||
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
|
||||
// so these deletion records do not take effect in query/search
|
||||
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] wil be reset to 0
|
||||
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
|
||||
start = del_barrier;
|
||||
end = old->del_barrier;
|
||||
end = old_del_barrier;
|
||||
} else {
|
||||
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
|
||||
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
|
||||
start = old->del_barrier;
|
||||
start = old_del_barrier;
|
||||
end = del_barrier;
|
||||
}
|
||||
|
||||
// Avoid invalid calculations when there are a lot of repeated delete pks
|
||||
std::unordered_map<PkType, Timestamp> delete_timestamps;
|
||||
for (auto del_index = start; del_index < end; ++del_index) {
|
||||
// get pk in delete logs
|
||||
auto pk = delete_record.pks_[del_index];
|
||||
// find insert data which has same pk
|
||||
auto timestamp = delete_record.timestamps_[del_index];
|
||||
|
||||
delete_timestamps[pk] = timestamp > delete_timestamps[pk] ? timestamp : delete_timestamps[pk];
|
||||
}
|
||||
|
||||
for (auto iter = delete_timestamps.begin(); iter != delete_timestamps.end(); iter++) {
|
||||
auto pk = iter->first;
|
||||
auto delete_timestamp = iter->second;
|
||||
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
|
||||
for (auto offset : segOffsets) {
|
||||
int64_t insert_row_offset = offset.get();
|
||||
|
@ -419,22 +425,22 @@ get_deleted_bitmap(int64_t del_barrier,
|
|||
|
||||
// insert after delete with same pk, delete will not task effect on this insert record
|
||||
// and reset bitmap to 0
|
||||
if (insert_record.timestamps_[insert_row_offset] > delete_record.timestamps_[del_index]) {
|
||||
if (insert_record.timestamps_[insert_row_offset] > delete_timestamp) {
|
||||
bitmap->reset(insert_row_offset);
|
||||
continue;
|
||||
}
|
||||
|
||||
// the deletion record do not take effect in search/query
|
||||
// and reset bitmap to 0
|
||||
if (delete_record.timestamps_[del_index] > query_timestamp) {
|
||||
if (delete_timestamp > query_timestamp) {
|
||||
bitmap->reset(insert_row_offset);
|
||||
continue;
|
||||
}
|
||||
|
||||
// insert data corresponding to the insert_row_offset will be ignored in search/query
|
||||
bitmap->set(insert_row_offset);
|
||||
}
|
||||
}
|
||||
|
||||
delete_record.insert_lru_entry(current);
|
||||
return current;
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ func (q *queryTask) PreExecute(ctx context.Context) error {
|
|||
func (q *queryTask) queryOnStreaming() error {
|
||||
// check ctx timeout
|
||||
if !funcutil.CheckCtxValid(q.Ctx()) {
|
||||
return errors.New("search context timeout")
|
||||
return errors.New("query context timeout")
|
||||
}
|
||||
|
||||
// check if collection has been released, check streaming since it's released first
|
||||
|
|
Loading…
Reference in New Issue