Revert "enhance: remove timestamp_filter after retrieve (#35207)"

This reverts commit 16dd53e7cf.
revert-35207-remove_retrieve_ts
sre-ci-robot 2024-08-02 19:33:20 +08:00 committed by GitHub
parent 16dd53e7cf
commit 494ac07271
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 140 additions and 52 deletions

View File

@ -139,6 +139,7 @@ PhyTermFilterExpr::CanSkipSegment() {
if (segment_->type() == SegmentType::Sealed &&
skip_index.CanSkipBinaryRange<T>(field_id_, 0, min, max, true, true)) {
cached_bits_.resize(active_count_, false);
cached_offsets_ = std::make_shared<ColumnVector>(DataType::INT64, 0);
cached_offsets_inited_ = true;
return true;
}
@ -177,9 +178,14 @@ PhyTermFilterExpr::InitPkCacheOffset() {
auto [uids, seg_offsets] =
segment_->search_ids(*id_array, query_timestamp_);
cached_bits_.resize(active_count_, false);
cached_offsets_ =
std::make_shared<ColumnVector>(DataType::INT64, seg_offsets.size());
int64_t* cached_offsets_ptr = (int64_t*)cached_offsets_->GetRawData();
int i = 0;
for (const auto& offset : seg_offsets) {
auto _offset = (int64_t)offset.get();
cached_bits_[_offset] = true;
cached_offsets_ptr[i++] = _offset;
}
cached_offsets_inited_ = true;
}
@ -208,10 +214,7 @@ PhyTermFilterExpr::ExecPkTermImpl() {
}
if (use_cache_offsets_) {
auto cache_bits_copy = cached_bits_.clone();
std::vector<VectorPtr> vecs{
res_vec,
std::make_shared<ColumnVector>(std::move(cache_bits_copy))};
std::vector<VectorPtr> vecs{res_vec, cached_offsets_};
return std::make_shared<RowVector>(vecs);
} else {
return res_vec;

View File

@ -78,6 +78,21 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
return ret;
}
void
SetExprCacheOffsets(std::vector<int64_t>&& offsets) {
expr_cached_pk_id_offsets_ = std::move(offsets);
}
void
AddExprCacheOffset(int64_t offset) {
expr_cached_pk_id_offsets_.push_back(offset);
}
const std::vector<int64_t>&
GetExprCacheOffsets() {
return expr_cached_pk_id_offsets_;
}
void
SetExprUsePkIndex(bool use_pk_index) {
expr_use_pk_index_ = use_pk_index;
@ -88,11 +103,29 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
return expr_use_pk_index_;
}
void
ExecuteExprNodeInternal(
const std::shared_ptr<milvus::plan::PlanNode>& plannode,
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
BitsetType& result,
bool& cache_offset_getted,
std::vector<int64_t>& cache_offset);
void
ExecuteExprNode(const std::shared_ptr<milvus::plan::PlanNode>& plannode,
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
BitsetType& result);
BitsetType& result) {
bool get_cache_offset;
std::vector<int64_t> cache_offsets;
ExecuteExprNodeInternal(plannode,
segment,
active_count,
result,
get_cache_offset,
cache_offsets);
}
private:
template <typename VectorType>
@ -107,5 +140,6 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
SearchResultOpt search_result_opt_;
RetrieveResultOpt retrieve_result_opt_;
bool expr_use_pk_index_ = false;
std::vector<int64_t> expr_cached_pk_id_offsets_;
};
} // namespace milvus::query

View File

@ -2547,6 +2547,7 @@ ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> BitsetType {
// If enable plan_visitor pk index cache, pass offsets_ to it
if (plan_visitor_ != nullptr) {
plan_visitor_->SetExprUsePkIndex(true);
plan_visitor_->SetExprCacheOffsets(std::move(cached_offsets));
}
AssertInfo(bitset.size() == row_count_,
"[ExecExprVisitor]Size of results not equal row count");

View File

@ -75,11 +75,13 @@ empty_search_result(int64_t num_queries, SearchInfo& search_info) {
}
void
ExecPlanNodeVisitor::ExecuteExprNode(
ExecPlanNodeVisitor::ExecuteExprNodeInternal(
const std::shared_ptr<milvus::plan::PlanNode>& plannode,
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
BitsetType& bitset_holder) {
BitsetType& bitset_holder,
bool& cache_offset_getted,
std::vector<int64_t>& cache_offset) {
bitset_holder.clear();
LOG_DEBUG("plannode: {}, active_count: {}, timestamp: {}",
plannode->ToString(),
@ -92,7 +94,6 @@ ExecPlanNodeVisitor::ExecuteExprNode(
auto task =
milvus::exec::Task::Create(DEFAULT_TASK_ID, plan, 0, query_context);
bool cache_offset_getted = false;
for (;;) {
auto result = task->Next();
if (!result) {
@ -114,17 +115,20 @@ ExecPlanNodeVisitor::ExecuteExprNode(
if (!cache_offset_getted) {
// offset cache only get once because not support iterator batch
auto cache_bits_vec =
auto cache_offset_vec =
std::dynamic_pointer_cast<ColumnVector>(row->child(1));
TargetBitmapView view(cache_bits_vec->GetRawData(),
cache_bits_vec->size());
// If get empty cached bits. mean no record hits in this segment
// If get empty cached offsets. mean no record hits in this segment
// no need to get next batch.
if (view.count() == 0) {
if (cache_offset_vec->size() == 0) {
bitset_holder.resize(active_count);
task->RequestCancel();
break;
}
auto cache_offset_vec_ptr =
(int64_t*)(cache_offset_vec->GetRawData());
for (size_t i = 0; i < cache_offset_vec->size(); ++i) {
cache_offset.push_back(cache_offset_vec_ptr[i]);
}
cache_offset_getted = true;
}
} else {
@ -277,12 +281,17 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
bitset_holder.resize(active_count);
}
// This flag used to indicate whether to get offset from expr module that
// speeds up mvcc filter in the next interface: "timestamp_filter"
bool get_cache_offset = false;
std::vector<int64_t> cache_offsets;
if (node.filter_plannode_.has_value()) {
ExecuteExprNode(node.filter_plannode_.value(),
segment,
active_count,
bitset_holder);
ExecuteExprNodeInternal(node.filter_plannode_.value(),
segment,
active_count,
bitset_holder,
get_cache_offset,
cache_offsets);
bitset_holder.flip();
}
@ -304,7 +313,16 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
}
retrieve_result.total_data_cnt_ = bitset_holder.size();
auto results_pair = segment->find_first(node.limit_, bitset_holder);
bool false_filtered_out = false;
if (get_cache_offset) {
segment->timestamp_filter(bitset_holder, cache_offsets, timestamp_);
} else {
bitset_holder.flip();
false_filtered_out = true;
segment->timestamp_filter(bitset_holder, timestamp_);
}
auto results_pair =
segment->find_first(node.limit_, bitset_holder, false_filtered_out);
retrieve_result.result_offsets_ = std::move(results_pair.first);
retrieve_result.has_more_result = results_pair.second;
retrieve_result_opt_ = std::move(retrieve_result);

View File

@ -63,7 +63,9 @@ class OffsetMap {
using OffsetType = int64_t;
// TODO: in fact, we can retrieve the pk here. Not sure which way is more efficient.
virtual std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const = 0;
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const = 0;
virtual void
clear() = 0;
@ -167,7 +169,9 @@ class OffsetOrderedMap : public OffsetMap {
}
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override {
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
std::shared_lock<std::shared_mutex> lck(mtx_);
if (limit == Unlimited || limit == NoLimit) {
@ -176,7 +180,7 @@ class OffsetOrderedMap : public OffsetMap {
// TODO: we can't retrieve pk by offset very conveniently.
// Selectivity should be done outside.
return find_first_by_index(limit, bitset);
return find_first_by_index(limit, bitset, false_filtered_out);
}
void
@ -187,10 +191,15 @@ class OffsetOrderedMap : public OffsetMap {
private:
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first_by_index(int64_t limit, const BitsetType& bitset) const {
find_first_by_index(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const {
int64_t hit_num = 0; // avoid counting the number everytime.
int64_t cnt = bitset.count();
auto size = bitset.size();
int64_t cnt = size - bitset.count();
if (!false_filtered_out) {
cnt = size - bitset.count();
}
limit = std::min(limit, cnt);
std::vector<int64_t> seg_offsets;
seg_offsets.reserve(limit);
@ -205,7 +214,7 @@ class OffsetOrderedMap : public OffsetMap {
continue;
}
if (!bitset[seg_offset]) {
if (!(bitset[seg_offset] ^ false_filtered_out)) {
seg_offsets.push_back(seg_offset);
hit_num++;
// PK hit, no need to continue traversing offsets with the same PK.
@ -337,7 +346,9 @@ class OffsetOrderedArray : public OffsetMap {
}
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override {
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
check_search();
if (limit == Unlimited || limit == NoLimit) {
@ -346,7 +357,7 @@ class OffsetOrderedArray : public OffsetMap {
// TODO: we can't retrieve pk by offset very conveniently.
// Selectivity should be done outside.
return find_first_by_index(limit, bitset);
return find_first_by_index(limit, bitset, false_filtered_out);
}
void
@ -357,10 +368,15 @@ class OffsetOrderedArray : public OffsetMap {
private:
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first_by_index(int64_t limit, const BitsetType& bitset) const {
find_first_by_index(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const {
int64_t hit_num = 0; // avoid counting the number everytime.
int64_t cnt = bitset.count();
auto size = bitset.size();
int64_t cnt = size - bitset.count();
if (!false_filtered_out) {
cnt = size - bitset.count();
}
auto more_hit_than_limit = cnt > limit;
limit = std::min(limit, cnt);
std::vector<int64_t> seg_offsets;
@ -373,7 +389,7 @@ class OffsetOrderedArray : public OffsetMap {
continue;
}
if (!bitset[seg_offset]) {
if (!(bitset[seg_offset] ^ false_filtered_out)) {
seg_offsets.push_back(seg_offset);
hit_num++;
}

View File

@ -307,8 +307,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
}
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override {
return insert_record_.pk2offset_->find_first(limit, bitset);
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
return insert_record_.pk2offset_->find_first(
limit, bitset, false_filtered_out);
}
bool

View File

@ -340,7 +340,9 @@ class SegmentInternalInterface : public SegmentInterface {
* @return All candidates offsets.
*/
virtual std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const = 0;
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const = 0;
void
FillTargetEntry(

View File

@ -151,8 +151,11 @@ class SegmentSealedImpl : public SegmentSealed {
const Timestamp* timestamps) override;
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override {
return insert_record_.pk2offset_->find_first(limit, bitset);
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
return insert_record_.pk2offset_->find_first(
limit, bitset, false_filtered_out);
}
// Calculate: output[i] = Vec[seg_offset[i]]

View File

@ -66,7 +66,7 @@ TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest);
TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
// not sealed.
ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}));
ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}, true));
// insert 10 entities.
int num = 10;
@ -81,8 +81,10 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
// all is satisfied.
{
BitsetType all(num);
all.set();
{
auto [offsets, has_more_res] = this->map_.find_first(num / 2, all);
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -91,7 +93,7 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
}
{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all);
this->map_.find_first(Unlimited, all, true);
ASSERT_EQ(num, offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -102,9 +104,10 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
{
// corner case, segment offset exceeds the size of bitset.
BitsetType all_minus_1(num - 1);
all_minus_1.set();
{
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all_minus_1);
this->map_.find_first(num / 2, all_minus_1, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -113,7 +116,7 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
}
{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all_minus_1);
this->map_.find_first(Unlimited, all_minus_1, true);
ASSERT_EQ(all_minus_1.size(), offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -124,11 +127,11 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
{
// none is satisfied.
BitsetType none(num);
none.set();
auto result_pair = this->map_.find_first(num / 2, none);
none.reset();
auto result_pair = this->map_.find_first(num / 2, none, true);
ASSERT_EQ(0, result_pair.first.size());
ASSERT_FALSE(result_pair.second);
result_pair = this->map_.find_first(NoLimit, none);
result_pair = this->map_.find_first(NoLimit, none, true);
ASSERT_EQ(0, result_pair.first.size());
ASSERT_FALSE(result_pair.second);
}

View File

@ -62,7 +62,8 @@ TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest);
TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
// no data.
{
auto [offsets, has_more_res] = this->map_.find_first(Unlimited, {});
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, {}, true);
ASSERT_EQ(0, offsets.size());
ASSERT_FALSE(has_more_res);
}
@ -75,10 +76,11 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
// all is satisfied.
BitsetType all(num);
all.reset();
all.set();
{
auto [offsets, has_more_res] = this->map_.find_first(num / 2, all);
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -86,7 +88,8 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
}
}
{
auto [offsets, has_more_res] = this->map_.find_first(Unlimited, all);
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all, true);
ASSERT_EQ(num, offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -96,10 +99,10 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
// corner case, segment offset exceeds the size of bitset.
BitsetType all_minus_1(num - 1);
all_minus_1.reset();
all_minus_1.set();
{
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all_minus_1);
this->map_.find_first(num / 2, all_minus_1, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -108,7 +111,7 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
}
{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all_minus_1);
this->map_.find_first(Unlimited, all_minus_1, true);
ASSERT_EQ(all_minus_1.size(), offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
@ -118,14 +121,16 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
// none is satisfied.
BitsetType none(num);
none.set();
none.reset();
{
auto [offsets, has_more_res] = this->map_.find_first(num / 2, none);
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, none, true);
ASSERT_TRUE(has_more_res);
ASSERT_EQ(0, offsets.size());
}
{
auto [offsets, has_more_res] = this->map_.find_first(NoLimit, none);
auto [offsets, has_more_res] =
this->map_.find_first(NoLimit, none, true);
ASSERT_TRUE(has_more_res);
ASSERT_EQ(0, offsets.size());
}