mirror of https://github.com/milvus-io/milvus.git
related: #33137 adding has_more_result_tag for various level's reduce to rectify reduce_stop_for_best Signed-off-by: MrPresent-Han <chun.han@zilliz.com>pull/33490/head
parent
77637180fa
commit
416a2cf507
|
@ -228,6 +228,7 @@ struct RetrieveResult {
|
||||||
void* segment_;
|
void* segment_;
|
||||||
std::vector<int64_t> result_offsets_;
|
std::vector<int64_t> result_offsets_;
|
||||||
std::vector<DataArray> field_data_;
|
std::vector<DataArray> field_data_;
|
||||||
|
bool has_more_result = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
using RetrieveResultPtr = std::shared_ptr<RetrieveResult>;
|
using RetrieveResultPtr = std::shared_ptr<RetrieveResult>;
|
||||||
|
|
|
@ -291,8 +291,10 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
|
||||||
false_filtered_out = true;
|
false_filtered_out = true;
|
||||||
segment->timestamp_filter(bitset_holder, timestamp_);
|
segment->timestamp_filter(bitset_holder, timestamp_);
|
||||||
}
|
}
|
||||||
retrieve_result.result_offsets_ =
|
auto results_pair =
|
||||||
segment->find_first(node.limit_, bitset_holder, false_filtered_out);
|
segment->find_first(node.limit_, bitset_holder, false_filtered_out);
|
||||||
|
retrieve_result.result_offsets_ = std::move(results_pair.first);
|
||||||
|
retrieve_result.has_more_result = results_pair.second;
|
||||||
retrieve_result_opt_ = std::move(retrieve_result);
|
retrieve_result_opt_ = std::move(retrieve_result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ class OffsetMap {
|
||||||
|
|
||||||
using OffsetType = int64_t;
|
using OffsetType = int64_t;
|
||||||
// TODO: in fact, we can retrieve the pk here. Not sure which way is more efficient.
|
// TODO: in fact, we can retrieve the pk here. Not sure which way is more efficient.
|
||||||
virtual std::vector<OffsetType>
|
virtual std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first(int64_t limit,
|
find_first(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const = 0;
|
bool false_filtered_out) const = 0;
|
||||||
|
@ -109,7 +109,7 @@ class OffsetOrderedMap : public OffsetMap {
|
||||||
return map_.empty();
|
return map_.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<OffsetType>
|
std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first(int64_t limit,
|
find_first(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const override {
|
bool false_filtered_out) const override {
|
||||||
|
@ -131,7 +131,7 @@ class OffsetOrderedMap : public OffsetMap {
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<OffsetType>
|
std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first_by_index(int64_t limit,
|
find_first_by_index(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const {
|
bool false_filtered_out) const {
|
||||||
|
@ -144,8 +144,8 @@ class OffsetOrderedMap : public OffsetMap {
|
||||||
limit = std::min(limit, cnt);
|
limit = std::min(limit, cnt);
|
||||||
std::vector<int64_t> seg_offsets;
|
std::vector<int64_t> seg_offsets;
|
||||||
seg_offsets.reserve(limit);
|
seg_offsets.reserve(limit);
|
||||||
for (auto it = map_.begin(); hit_num < limit && it != map_.end();
|
auto it = map_.begin();
|
||||||
it++) {
|
for (; hit_num < limit && it != map_.end(); it++) {
|
||||||
for (auto seg_offset : it->second) {
|
for (auto seg_offset : it->second) {
|
||||||
if (seg_offset >= size) {
|
if (seg_offset >= size) {
|
||||||
// Frequently concurrent insert/query will cause this case.
|
// Frequently concurrent insert/query will cause this case.
|
||||||
|
@ -161,7 +161,7 @@ class OffsetOrderedMap : public OffsetMap {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return seg_offsets;
|
return {seg_offsets, it != map_.end()};
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -226,7 +226,7 @@ class OffsetOrderedArray : public OffsetMap {
|
||||||
return array_.empty();
|
return array_.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<OffsetType>
|
std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first(int64_t limit,
|
find_first(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const override {
|
bool false_filtered_out) const override {
|
||||||
|
@ -248,7 +248,7 @@ class OffsetOrderedArray : public OffsetMap {
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<OffsetType>
|
std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first_by_index(int64_t limit,
|
find_first_by_index(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const {
|
bool false_filtered_out) const {
|
||||||
|
@ -261,11 +261,11 @@ class OffsetOrderedArray : public OffsetMap {
|
||||||
limit = std::min(limit, cnt);
|
limit = std::min(limit, cnt);
|
||||||
std::vector<int64_t> seg_offsets;
|
std::vector<int64_t> seg_offsets;
|
||||||
seg_offsets.reserve(limit);
|
seg_offsets.reserve(limit);
|
||||||
for (auto it = array_.begin(); hit_num < limit && it != array_.end();
|
auto it = array_.begin();
|
||||||
it++) {
|
for (; hit_num < limit && it != array_.end(); it++) {
|
||||||
auto seg_offset = it->second;
|
auto seg_offset = it->second;
|
||||||
if (seg_offset >= size) {
|
if (seg_offset >= size) {
|
||||||
// In fact, this case won't happend on sealed segments.
|
// In fact, this case won't happen on sealed segments.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,7 +274,7 @@ class OffsetOrderedArray : public OffsetMap {
|
||||||
hit_num++;
|
hit_num++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return seg_offsets;
|
return {seg_offsets, it != array_.end()};
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|
|
@ -268,7 +268,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<OffsetMap::OffsetType>
|
std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first(int64_t limit,
|
find_first(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const override {
|
bool false_filtered_out) const override {
|
||||||
|
|
|
@ -91,6 +91,7 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
|
||||||
query::ExecPlanNodeVisitor visitor(*this, timestamp);
|
query::ExecPlanNodeVisitor visitor(*this, timestamp);
|
||||||
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
|
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
|
||||||
retrieve_results.segment_ = (void*)this;
|
retrieve_results.segment_ = (void*)this;
|
||||||
|
results->set_has_more_result(retrieve_results.has_more_result);
|
||||||
|
|
||||||
auto result_rows = retrieve_results.result_offsets_.size();
|
auto result_rows = retrieve_results.result_offsets_.size();
|
||||||
int64_t output_data_size = 0;
|
int64_t output_data_size = 0;
|
||||||
|
@ -120,7 +121,6 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
|
||||||
retrieve_results.result_offsets_.size(),
|
retrieve_results.result_offsets_.size(),
|
||||||
ignore_non_pk,
|
ignore_non_pk,
|
||||||
true);
|
true);
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -290,7 +290,7 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||||
* @param false_filtered_out
|
* @param false_filtered_out
|
||||||
* @return All candidates offsets.
|
* @return All candidates offsets.
|
||||||
*/
|
*/
|
||||||
virtual std::vector<OffsetMap::OffsetType>
|
virtual std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first(int64_t limit,
|
find_first(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const = 0;
|
bool false_filtered_out) const = 0;
|
||||||
|
|
|
@ -133,7 +133,7 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||||
const IdArray* pks,
|
const IdArray* pks,
|
||||||
const Timestamp* timestamps) override;
|
const Timestamp* timestamps) override;
|
||||||
|
|
||||||
std::vector<OffsetMap::OffsetType>
|
std::pair<std::vector<OffsetMap::OffsetType>, bool>
|
||||||
find_first(int64_t limit,
|
find_first(int64_t limit,
|
||||||
const BitsetType& bitset,
|
const BitsetType& bitset,
|
||||||
bool false_filtered_out) const override {
|
bool false_filtered_out) const override {
|
||||||
|
|
|
@ -65,8 +65,6 @@ using TypeOfPks = testing::Types<int64_t, std::string>;
|
||||||
TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest);
|
TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest);
|
||||||
|
|
||||||
TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
|
TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
|
||||||
std::vector<int64_t> offsets;
|
|
||||||
|
|
||||||
// not sealed.
|
// not sealed.
|
||||||
ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}, true));
|
ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}, true));
|
||||||
|
|
||||||
|
@ -81,40 +79,62 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
|
||||||
this->seal();
|
this->seal();
|
||||||
|
|
||||||
// all is satisfied.
|
// all is satisfied.
|
||||||
BitsetType all(num);
|
{
|
||||||
all.set();
|
BitsetType all(num);
|
||||||
offsets = this->map_.find_first(num / 2, all, true);
|
all.set();
|
||||||
ASSERT_EQ(num / 2, offsets.size());
|
{
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
auto [offsets, has_more_res] =
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
this->map_.find_first(num / 2, all, true);
|
||||||
|
ASSERT_EQ(num / 2, offsets.size());
|
||||||
|
ASSERT_TRUE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
auto [offsets, has_more_res] =
|
||||||
|
this->map_.find_first(Unlimited, all, true);
|
||||||
|
ASSERT_EQ(num, offsets.size());
|
||||||
|
ASSERT_FALSE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
offsets = this->map_.find_first(Unlimited, all, true);
|
{
|
||||||
ASSERT_EQ(num, offsets.size());
|
// corner case, segment offset exceeds the size of bitset.
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
BitsetType all_minus_1(num - 1);
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
all_minus_1.set();
|
||||||
|
{
|
||||||
|
auto [offsets, has_more_res] =
|
||||||
|
this->map_.find_first(num / 2, all_minus_1, true);
|
||||||
|
ASSERT_EQ(num / 2, offsets.size());
|
||||||
|
ASSERT_TRUE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
auto [offsets, has_more_res] =
|
||||||
|
this->map_.find_first(Unlimited, all_minus_1, true);
|
||||||
|
ASSERT_EQ(all_minus_1.size(), offsets.size());
|
||||||
|
ASSERT_FALSE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
{
|
||||||
// corner case, segment offset exceeds the size of bitset.
|
// none is satisfied.
|
||||||
BitsetType all_minus_1(num - 1);
|
BitsetType none(num);
|
||||||
all_minus_1.set();
|
none.reset();
|
||||||
offsets = this->map_.find_first(num / 2, all_minus_1, true);
|
auto result_pair = this->map_.find_first(num / 2, none, true);
|
||||||
ASSERT_EQ(num / 2, offsets.size());
|
ASSERT_EQ(0, result_pair.first.size());
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
ASSERT_TRUE(result_pair.second);
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
result_pair = this->map_.find_first(NoLimit, none, true);
|
||||||
|
ASSERT_EQ(0, result_pair.first.size());
|
||||||
|
ASSERT_TRUE(result_pair.second);
|
||||||
}
|
}
|
||||||
offsets = this->map_.find_first(Unlimited, all_minus_1, true);
|
|
||||||
ASSERT_EQ(all_minus_1.size(), offsets.size());
|
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// none is satisfied.
|
|
||||||
BitsetType none(num);
|
|
||||||
none.reset();
|
|
||||||
offsets = this->map_.find_first(num / 2, none, true);
|
|
||||||
ASSERT_EQ(0, offsets.size());
|
|
||||||
offsets = this->map_.find_first(NoLimit, none, true);
|
|
||||||
ASSERT_EQ(0, offsets.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
REGISTER_TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest, find_first);
|
REGISTER_TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest, find_first);
|
||||||
|
|
|
@ -60,12 +60,13 @@ using TypeOfPks = testing::Types<int64_t, std::string>;
|
||||||
TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest);
|
TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest);
|
||||||
|
|
||||||
TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
|
TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
|
||||||
std::vector<int64_t> offsets;
|
|
||||||
|
|
||||||
// no data.
|
// no data.
|
||||||
offsets = this->map_.find_first(Unlimited, {}, true);
|
{
|
||||||
ASSERT_EQ(0, offsets.size());
|
auto [offsets, has_more_res] =
|
||||||
|
this->map_.find_first(Unlimited, {}, true);
|
||||||
|
ASSERT_EQ(0, offsets.size());
|
||||||
|
ASSERT_FALSE(has_more_res);
|
||||||
|
}
|
||||||
// insert 10 entities.
|
// insert 10 entities.
|
||||||
int num = 10;
|
int num = 10;
|
||||||
auto data = this->random_generate(num);
|
auto data = this->random_generate(num);
|
||||||
|
@ -76,38 +77,63 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
|
||||||
// all is satisfied.
|
// all is satisfied.
|
||||||
BitsetType all(num);
|
BitsetType all(num);
|
||||||
all.set();
|
all.set();
|
||||||
offsets = this->map_.find_first(num / 2, all, true);
|
|
||||||
ASSERT_EQ(num / 2, offsets.size());
|
{
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
auto [offsets, has_more_res] =
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
this->map_.find_first(num / 2, all, true);
|
||||||
|
ASSERT_EQ(num / 2, offsets.size());
|
||||||
|
ASSERT_TRUE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
offsets = this->map_.find_first(Unlimited, all, true);
|
{
|
||||||
ASSERT_EQ(num, offsets.size());
|
auto [offsets, has_more_res] =
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
this->map_.find_first(Unlimited, all, true);
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
ASSERT_EQ(num, offsets.size());
|
||||||
|
ASSERT_FALSE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// corner case, segment offset exceeds the size of bitset.
|
// corner case, segment offset exceeds the size of bitset.
|
||||||
BitsetType all_minus_1(num - 1);
|
BitsetType all_minus_1(num - 1);
|
||||||
all_minus_1.set();
|
all_minus_1.set();
|
||||||
offsets = this->map_.find_first(num / 2, all_minus_1, true);
|
{
|
||||||
ASSERT_EQ(num / 2, offsets.size());
|
auto [offsets, has_more_res] =
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
this->map_.find_first(num / 2, all_minus_1, true);
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
ASSERT_EQ(num / 2, offsets.size());
|
||||||
|
ASSERT_TRUE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
offsets = this->map_.find_first(Unlimited, all_minus_1, true);
|
{
|
||||||
ASSERT_EQ(all_minus_1.size(), offsets.size());
|
auto [offsets, has_more_res] =
|
||||||
for (int i = 1; i < offsets.size(); i++) {
|
this->map_.find_first(Unlimited, all_minus_1, true);
|
||||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
ASSERT_EQ(all_minus_1.size(), offsets.size());
|
||||||
|
ASSERT_FALSE(has_more_res);
|
||||||
|
for (int i = 1; i < offsets.size(); i++) {
|
||||||
|
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// none is satisfied.
|
// none is satisfied.
|
||||||
BitsetType none(num);
|
BitsetType none(num);
|
||||||
none.reset();
|
none.reset();
|
||||||
offsets = this->map_.find_first(num / 2, none, true);
|
{
|
||||||
ASSERT_EQ(0, offsets.size());
|
auto [offsets, has_more_res] =
|
||||||
offsets = this->map_.find_first(NoLimit, none, true);
|
this->map_.find_first(num / 2, none, true);
|
||||||
ASSERT_EQ(0, offsets.size());
|
ASSERT_TRUE(has_more_res);
|
||||||
|
ASSERT_EQ(0, offsets.size());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
auto [offsets, has_more_res] =
|
||||||
|
this->map_.find_first(NoLimit, none, true);
|
||||||
|
ASSERT_TRUE(has_more_res);
|
||||||
|
ASSERT_EQ(0, offsets.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
REGISTER_TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest, find_first);
|
REGISTER_TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest, find_first);
|
||||||
|
|
|
@ -198,6 +198,7 @@ message RetrieveResults {
|
||||||
// query request cost
|
// query request cost
|
||||||
CostAggregation costAggregation = 13;
|
CostAggregation costAggregation = 13;
|
||||||
int64 all_retrieve_count = 14;
|
int64 all_retrieve_count = 14;
|
||||||
|
bool has_more_result = 15;
|
||||||
}
|
}
|
||||||
|
|
||||||
message LoadIndex {
|
message LoadIndex {
|
||||||
|
|
|
@ -10,6 +10,7 @@ message RetrieveResults {
|
||||||
repeated int64 offset = 2;
|
repeated int64 offset = 2;
|
||||||
repeated schema.FieldData fields_data = 3;
|
repeated schema.FieldData fields_data = 3;
|
||||||
int64 all_retrieve_count = 4;
|
int64 all_retrieve_count = 4;
|
||||||
|
bool has_more_result = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message LoadFieldMeta {
|
message LoadFieldMeta {
|
||||||
|
|
|
@ -607,9 +607,9 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
|
||||||
idSet := make(map[interface{}]struct{})
|
idSet := make(map[interface{}]struct{})
|
||||||
cursors := make([]int64, len(validRetrieveResults))
|
cursors := make([]int64, len(validRetrieveResults))
|
||||||
|
|
||||||
retrieveLimit := typeutil.Unlimited
|
|
||||||
if queryParams != nil && queryParams.limit != typeutil.Unlimited {
|
if queryParams != nil && queryParams.limit != typeutil.Unlimited {
|
||||||
retrieveLimit = queryParams.limit + queryParams.offset
|
// reduceStopForBest will try to get as many results as possible
|
||||||
|
// so loopEnd in this case will be set to the sum of all results' size
|
||||||
if !queryParams.reduceStopForBest {
|
if !queryParams.reduceStopForBest {
|
||||||
loopEnd = int(queryParams.limit)
|
loopEnd = int(queryParams.limit)
|
||||||
}
|
}
|
||||||
|
@ -618,7 +618,7 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
|
||||||
// handle offset
|
// handle offset
|
||||||
if queryParams != nil && queryParams.offset > 0 {
|
if queryParams != nil && queryParams.offset > 0 {
|
||||||
for i := int64(0); i < queryParams.offset; i++ {
|
for i := int64(0); i < queryParams.offset; i++ {
|
||||||
sel, drainOneResult := typeutil.SelectMinPK(retrieveLimit, validRetrieveResults, cursors)
|
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
|
||||||
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
|
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
@ -626,16 +626,11 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
reduceStopForBest := false
|
|
||||||
if queryParams != nil {
|
|
||||||
reduceStopForBest = queryParams.reduceStopForBest
|
|
||||||
}
|
|
||||||
|
|
||||||
var retSize int64
|
var retSize int64
|
||||||
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
||||||
for j := 0; j < loopEnd; {
|
for j := 0; j < loopEnd; {
|
||||||
sel, drainOneResult := typeutil.SelectMinPK(retrieveLimit, validRetrieveResults, cursors)
|
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
|
||||||
if sel == -1 || (reduceStopForBest && drainOneResult) {
|
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -479,8 +479,7 @@ func TestTaskQuery_functions(t *testing.T) {
|
||||||
},
|
},
|
||||||
FieldsData: fieldDataArray2,
|
FieldsData: fieldDataArray2,
|
||||||
}
|
}
|
||||||
|
result, err := reduceRetrieveResults(context.Background(), []*internalpb.RetrieveResults{result1, result2}, &queryParams{limit: 2})
|
||||||
result, err := reduceRetrieveResults(context.Background(), []*internalpb.RetrieveResults{result1, result2}, nil)
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 2, len(result.GetFieldsData()))
|
assert.Equal(t, 2, len(result.GetFieldsData()))
|
||||||
assert.Equal(t, Int64Array, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
|
assert.Equal(t, Int64Array, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
|
||||||
|
@ -488,7 +487,7 @@ func TestTaskQuery_functions(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test nil results", func(t *testing.T) {
|
t.Run("test nil results", func(t *testing.T) {
|
||||||
ret, err := reduceRetrieveResults(context.Background(), nil, nil)
|
ret, err := reduceRetrieveResults(context.Background(), nil, &queryParams{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Empty(t, ret.GetFieldsData())
|
assert.Empty(t, ret.GetFieldsData())
|
||||||
})
|
})
|
||||||
|
@ -594,6 +593,8 @@ func TestTaskQuery_functions(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test stop reduce for best for limit", func(t *testing.T) {
|
t.Run("test stop reduce for best for limit", func(t *testing.T) {
|
||||||
|
r1.HasMoreResult = true
|
||||||
|
r2.HasMoreResult = false
|
||||||
result, err := reduceRetrieveResults(context.Background(),
|
result, err := reduceRetrieveResults(context.Background(),
|
||||||
[]*internalpb.RetrieveResults{r1, r2},
|
[]*internalpb.RetrieveResults{r1, r2},
|
||||||
&queryParams{limit: 2, reduceStopForBest: true})
|
&queryParams{limit: 2, reduceStopForBest: true})
|
||||||
|
@ -605,6 +606,8 @@ func TestTaskQuery_functions(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test stop reduce for best for limit and offset", func(t *testing.T) {
|
t.Run("test stop reduce for best for limit and offset", func(t *testing.T) {
|
||||||
|
r1.HasMoreResult = true
|
||||||
|
r2.HasMoreResult = true
|
||||||
result, err := reduceRetrieveResults(context.Background(),
|
result, err := reduceRetrieveResults(context.Background(),
|
||||||
[]*internalpb.RetrieveResults{r1, r2},
|
[]*internalpb.RetrieveResults{r1, r2},
|
||||||
&queryParams{limit: 1, offset: 1, reduceStopForBest: true})
|
&queryParams{limit: 1, offset: 1, reduceStopForBest: true})
|
||||||
|
@ -614,6 +617,8 @@ func TestTaskQuery_functions(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test stop reduce for best for limit and offset", func(t *testing.T) {
|
t.Run("test stop reduce for best for limit and offset", func(t *testing.T) {
|
||||||
|
r1.HasMoreResult = false
|
||||||
|
r2.HasMoreResult = true
|
||||||
result, err := reduceRetrieveResults(context.Background(),
|
result, err := reduceRetrieveResults(context.Background(),
|
||||||
[]*internalpb.RetrieveResults{r1, r2},
|
[]*internalpb.RetrieveResults{r1, r2},
|
||||||
&queryParams{limit: 2, offset: 1, reduceStopForBest: true})
|
&queryParams{limit: 2, offset: 1, reduceStopForBest: true})
|
||||||
|
@ -625,6 +630,8 @@ func TestTaskQuery_functions(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test stop reduce for best for unlimited set", func(t *testing.T) {
|
t.Run("test stop reduce for best for unlimited set", func(t *testing.T) {
|
||||||
|
r1.HasMoreResult = false
|
||||||
|
r2.HasMoreResult = false
|
||||||
result, err := reduceRetrieveResults(context.Background(),
|
result, err := reduceRetrieveResults(context.Background(),
|
||||||
[]*internalpb.RetrieveResults{r1, r2},
|
[]*internalpb.RetrieveResults{r1, r2},
|
||||||
&queryParams{limit: typeutil.Unlimited, reduceStopForBest: true})
|
&queryParams{limit: typeutil.Unlimited, reduceStopForBest: true})
|
||||||
|
@ -635,7 +642,7 @@ func TestTaskQuery_functions(t *testing.T) {
|
||||||
assert.InDeltaSlice(t, resultFloat[0:(len)*Dim], result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
|
assert.InDeltaSlice(t, resultFloat[0:(len)*Dim], result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test stop reduce for best for unlimited set amd pffset", func(t *testing.T) {
|
t.Run("test stop reduce for best for unlimited set amd offset", func(t *testing.T) {
|
||||||
result, err := reduceRetrieveResults(context.Background(),
|
result, err := reduceRetrieveResults(context.Background(),
|
||||||
[]*internalpb.RetrieveResults{r1, r2},
|
[]*internalpb.RetrieveResults{r1, r2},
|
||||||
&queryParams{limit: typeutil.Unlimited, offset: 3, reduceStopForBest: true})
|
&queryParams{limit: typeutil.Unlimited, offset: 3, reduceStopForBest: true})
|
||||||
|
|
|
@ -401,6 +401,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
|
||||||
|
|
||||||
validRetrieveResults := []*internalpb.RetrieveResults{}
|
validRetrieveResults := []*internalpb.RetrieveResults{}
|
||||||
relatedDataSize := int64(0)
|
relatedDataSize := int64(0)
|
||||||
|
hasMoreResult := false
|
||||||
for _, r := range retrieveResults {
|
for _, r := range retrieveResults {
|
||||||
ret.AllRetrieveCount += r.GetAllRetrieveCount()
|
ret.AllRetrieveCount += r.GetAllRetrieveCount()
|
||||||
relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize()
|
relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize()
|
||||||
|
@ -410,7 +411,9 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
|
||||||
}
|
}
|
||||||
validRetrieveResults = append(validRetrieveResults, r)
|
validRetrieveResults = append(validRetrieveResults, r)
|
||||||
loopEnd += size
|
loopEnd += size
|
||||||
|
hasMoreResult = hasMoreResult || r.GetHasMoreResult()
|
||||||
}
|
}
|
||||||
|
ret.HasMoreResult = hasMoreResult
|
||||||
|
|
||||||
if len(validRetrieveResults) == 0 {
|
if len(validRetrieveResults) == 0 {
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
@ -427,7 +430,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
|
||||||
var retSize int64
|
var retSize int64
|
||||||
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
||||||
for j := 0; j < loopEnd; {
|
for j := 0; j < loopEnd; {
|
||||||
sel, drainOneResult := typeutil.SelectMinPK(param.limit, validRetrieveResults, cursors)
|
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
|
||||||
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
|
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -515,6 +518,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
|
||||||
validSegments := make([]Segment, 0, len(segments))
|
validSegments := make([]Segment, 0, len(segments))
|
||||||
selectedOffsets := make([][]int64, 0, len(retrieveResults))
|
selectedOffsets := make([][]int64, 0, len(retrieveResults))
|
||||||
selectedIndexes := make([][]int64, 0, len(retrieveResults))
|
selectedIndexes := make([][]int64, 0, len(retrieveResults))
|
||||||
|
hasMoreResult := false
|
||||||
for i, r := range retrieveResults {
|
for i, r := range retrieveResults {
|
||||||
size := typeutil.GetSizeOfIDs(r.GetIds())
|
size := typeutil.GetSizeOfIDs(r.GetIds())
|
||||||
ret.AllRetrieveCount += r.GetAllRetrieveCount()
|
ret.AllRetrieveCount += r.GetAllRetrieveCount()
|
||||||
|
@ -529,7 +533,9 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
|
||||||
selectedOffsets = append(selectedOffsets, make([]int64, 0, len(r.GetOffset())))
|
selectedOffsets = append(selectedOffsets, make([]int64, 0, len(r.GetOffset())))
|
||||||
selectedIndexes = append(selectedIndexes, make([]int64, 0, len(r.GetOffset())))
|
selectedIndexes = append(selectedIndexes, make([]int64, 0, len(r.GetOffset())))
|
||||||
loopEnd += size
|
loopEnd += size
|
||||||
|
hasMoreResult = r.GetHasMoreResult() || hasMoreResult
|
||||||
}
|
}
|
||||||
|
ret.HasMoreResult = hasMoreResult
|
||||||
|
|
||||||
if len(validRetrieveResults) == 0 {
|
if len(validRetrieveResults) == 0 {
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
@ -549,7 +555,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
|
||||||
var retSize int64
|
var retSize int64
|
||||||
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
|
||||||
for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ {
|
for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ {
|
||||||
sel, drainOneResult := typeutil.SelectMinPK(param.limit, validRetrieveResults, cursors)
|
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
|
||||||
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
|
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -513,29 +513,46 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
|
||||||
FieldsData: fieldDataArray2,
|
FieldsData: fieldDataArray2,
|
||||||
}
|
}
|
||||||
suite.Run("merge stop finite limited", func() {
|
suite.Run("merge stop finite limited", func() {
|
||||||
|
result1.HasMoreResult = true
|
||||||
|
result2.HasMoreResult = true
|
||||||
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
|
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
|
||||||
NewMergeParam(3, make([]int64, 0), nil, true))
|
NewMergeParam(3, make([]int64, 0), nil, true))
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(2, len(result.GetFieldsData()))
|
suite.Equal(2, len(result.GetFieldsData()))
|
||||||
|
// has more result both, stop reduce when draining one result
|
||||||
|
// here, we can only get best result from 0 to 4 without 6, because result1 has more results
|
||||||
suite.Equal([]int64{0, 1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
|
suite.Equal([]int64{0, 1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
|
||||||
// here, we can only get best result from 0 to 4 without 6, because we can never know whether there is
|
|
||||||
// one potential 5 in following result1
|
|
||||||
suite.Equal([]int64{11, 22, 11, 22, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
|
suite.Equal([]int64{11, 22, 11, 22, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
|
||||||
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44},
|
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44},
|
||||||
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
|
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
|
||||||
})
|
})
|
||||||
suite.Run("merge stop unlimited", func() {
|
suite.Run("merge stop unlimited", func() {
|
||||||
|
result1.HasMoreResult = false
|
||||||
|
result2.HasMoreResult = false
|
||||||
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
|
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
|
||||||
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, true))
|
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, true))
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(2, len(result.GetFieldsData()))
|
suite.Equal(2, len(result.GetFieldsData()))
|
||||||
|
// as result1 and result2 don't have better results neither
|
||||||
|
// we can reduce all available result into the reduced result
|
||||||
suite.Equal([]int64{0, 1, 2, 3, 4, 6}, result.GetIds().GetIntId().GetData())
|
suite.Equal([]int64{0, 1, 2, 3, 4, 6}, result.GetIds().GetIntId().GetData())
|
||||||
// here, we can only get best result from 0 to 4 without 6, because we can never know whether there is
|
|
||||||
// one potential 5 in following result1
|
|
||||||
suite.Equal([]int64{11, 22, 11, 22, 33, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
|
suite.Equal([]int64{11, 22, 11, 22, 33, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
|
||||||
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44, 11, 22, 33, 44},
|
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44, 11, 22, 33, 44},
|
||||||
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
|
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
|
||||||
})
|
})
|
||||||
|
suite.Run("merge stop one limited", func() {
|
||||||
|
result1.HasMoreResult = true
|
||||||
|
result2.HasMoreResult = false
|
||||||
|
result, err := MergeSegcoreRetrieveResultsV1(context.Background(), []*segcorepb.RetrieveResults{result1, result2},
|
||||||
|
NewMergeParam(typeutil.Unlimited, make([]int64, 0), nil, true))
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(2, len(result.GetFieldsData()))
|
||||||
|
// as result1 may have better results, stop reducing when draining it
|
||||||
|
suite.Equal([]int64{0, 1, 2, 3, 4}, result.GetIds().GetIntId().GetData())
|
||||||
|
suite.Equal([]int64{11, 22, 11, 22, 33}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
|
||||||
|
suite.InDeltaSlice([]float32{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 11, 22, 33, 44},
|
||||||
|
result.FieldsData[1].GetVectors().GetFloatVector().Data, 10e-10)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
suite.Run("test stop internal merge for best", func() {
|
suite.Run("test stop internal merge for best", func() {
|
||||||
|
@ -559,6 +576,8 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
|
||||||
},
|
},
|
||||||
FieldsData: fieldDataArray2,
|
FieldsData: fieldDataArray2,
|
||||||
}
|
}
|
||||||
|
result1.HasMoreResult = true
|
||||||
|
result2.HasMoreResult = false
|
||||||
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
|
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
|
||||||
NewMergeParam(3, make([]int64, 0), nil, true))
|
NewMergeParam(3, make([]int64, 0), nil, true))
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
@ -590,11 +609,24 @@ func (suite *ResultSuite) TestResult_MergeStopForBestResult() {
|
||||||
},
|
},
|
||||||
FieldsData: fieldDataArray2,
|
FieldsData: fieldDataArray2,
|
||||||
}
|
}
|
||||||
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
|
suite.Run("test drain one result without more results", func() {
|
||||||
NewMergeParam(3, make([]int64, 0), nil, true))
|
result1.HasMoreResult = false
|
||||||
suite.NoError(err)
|
result2.HasMoreResult = false
|
||||||
suite.Equal(2, len(result.GetFieldsData()))
|
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
|
||||||
suite.Equal([]int64{0, 2, 4, 7}, result.GetIds().GetIntId().GetData())
|
NewMergeParam(3, make([]int64, 0), nil, true))
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(2, len(result.GetFieldsData()))
|
||||||
|
suite.Equal([]int64{0, 2, 4, 7}, result.GetIds().GetIntId().GetData())
|
||||||
|
})
|
||||||
|
suite.Run("test drain one result with more results", func() {
|
||||||
|
result1.HasMoreResult = false
|
||||||
|
result2.HasMoreResult = true
|
||||||
|
result, err := MergeInternalRetrieveResult(context.Background(), []*internalpb.RetrieveResults{result1, result2},
|
||||||
|
NewMergeParam(3, make([]int64, 0), nil, true))
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(2, len(result.GetFieldsData()))
|
||||||
|
suite.Equal([]int64{0, 2}, result.GetIds().GetIntId().GetData())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -160,6 +160,7 @@ func (t *QueryTask) Execute() error {
|
||||||
TotalRelatedDataSize: relatedDataSize,
|
TotalRelatedDataSize: relatedDataSize,
|
||||||
},
|
},
|
||||||
AllRetrieveCount: reducedResult.GetAllRetrieveCount(),
|
AllRetrieveCount: reducedResult.GetAllRetrieveCount(),
|
||||||
|
HasMoreResult: reducedResult.HasMoreResult,
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1323,10 +1323,11 @@ func ComparePK(pkA, pkB interface{}) bool {
|
||||||
|
|
||||||
type ResultWithID interface {
|
type ResultWithID interface {
|
||||||
GetIds() *schemapb.IDs
|
GetIds() *schemapb.IDs
|
||||||
|
GetHasMoreResult() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectMinPK select the index of the minPK in results T of the cursors.
|
// SelectMinPK select the index of the minPK in results T of the cursors.
|
||||||
func SelectMinPK[T ResultWithID](limit int64, results []T, cursors []int64) (int, bool) {
|
func SelectMinPK[T ResultWithID](results []T, cursors []int64) (int, bool) {
|
||||||
var (
|
var (
|
||||||
sel = -1
|
sel = -1
|
||||||
drainResult = false
|
drainResult = false
|
||||||
|
@ -1336,8 +1337,9 @@ func SelectMinPK[T ResultWithID](limit int64, results []T, cursors []int64) (int
|
||||||
minStrPK string
|
minStrPK string
|
||||||
)
|
)
|
||||||
for i, cursor := range cursors {
|
for i, cursor := range cursors {
|
||||||
// if result size < limit, this means we should ignore the result from this segment
|
// if cursor has run out of all results from one result and this result has more matched results
|
||||||
if int(cursor) >= GetSizeOfIDs(results[i].GetIds()) && (GetSizeOfIDs(results[i].GetIds()) == int(limit)) {
|
// in this case we have tell reduce to stop because better results may be retrieved in the following iteration
|
||||||
|
if int(cursor) >= GetSizeOfIDs(results[i].GetIds()) && (results[i].GetHasMoreResult()) {
|
||||||
drainResult = true
|
drainResult = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue