mirror of https://github.com/milvus-io/milvus.git
Adapt Expr Executor to SegmentInternalInterface, fix row_count consistency
Signed-off-by: FluorineDog <guilin.gou@zilliz.com>pull/4973/head^2
parent
4af3914836
commit
9ff1bc0f99
|
@ -39,14 +39,13 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment,
|
|||
const query::QueryInfo& info,
|
||||
const float* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
int64_t ins_barrier,
|
||||
const BitsetView& bitset,
|
||||
QueryResult& results) {
|
||||
auto& schema = segment.get_schema();
|
||||
auto& indexing_record = segment.get_indexing_record();
|
||||
auto& record = segment.get_insert_record();
|
||||
// step 1: binary search to find the barrier of the snapshot
|
||||
auto ins_barrier = get_barrier(record, timestamp);
|
||||
// auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||
|
||||
#if 0
|
||||
|
@ -131,14 +130,14 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment,
|
|||
const query::QueryInfo& info,
|
||||
const uint8_t* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
int64_t ins_barrier,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results) {
|
||||
auto& schema = segment.get_schema();
|
||||
auto& indexing_record = segment.get_indexing_record();
|
||||
auto& record = segment.get_insert_record();
|
||||
// step 1: binary search to find the barrier of the snapshot
|
||||
auto ins_barrier = get_barrier(record, timestamp);
|
||||
// auto ins_barrier = get_barrier(record, timestamp);
|
||||
auto metric_type = GetMetricType(info.metric_type_);
|
||||
// auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||
|
||||
|
@ -199,33 +198,33 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment,
|
|||
template <typename VectorType>
|
||||
void
|
||||
SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
|
||||
int64_t ins_barrier,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<VectorType>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results) {
|
||||
static_assert(IsVector<VectorType>);
|
||||
if constexpr (std::is_same_v<VectorType, FloatVector>) {
|
||||
FloatSearch(segment, info, query_data, num_queries, timestamp, bitset, results);
|
||||
FloatSearch(segment, info, query_data, num_queries, ins_barrier, bitset, results);
|
||||
} else {
|
||||
BinarySearch(segment, info, query_data, num_queries, timestamp, bitset, results);
|
||||
BinarySearch(segment, info, query_data, num_queries, ins_barrier, bitset, results);
|
||||
}
|
||||
}
|
||||
template void
|
||||
SearchOnGrowing<FloatVector>(const segcore::SegmentGrowingImpl& segment,
|
||||
int64_t ins_barrier,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<FloatVector>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
template void
|
||||
SearchOnGrowing<BinaryVector>(const segcore::SegmentGrowingImpl& segment,
|
||||
int64_t ins_barrier,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<BinaryVector>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
|
||||
|
|
|
@ -23,10 +23,10 @@ using BitmapSimple = std::deque<BitmapChunk>;
|
|||
template <typename VectorType>
|
||||
void
|
||||
SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
|
||||
int64_t ins_barrier,
|
||||
const query::QueryInfo& info,
|
||||
const EmbeddedType<VectorType>* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& results);
|
||||
} // namespace milvus::query
|
||||
|
|
|
@ -52,7 +52,6 @@ SearchOnSealed(const Schema& schema,
|
|||
const QueryInfo& query_info,
|
||||
const void* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& bitset,
|
||||
QueryResult& result) {
|
||||
auto topK = query_info.topK_;
|
||||
|
|
|
@ -26,7 +26,6 @@ SearchOnSealed(const Schema& schema,
|
|||
const QueryInfo& query_info,
|
||||
const void* query_data,
|
||||
int64_t num_queries,
|
||||
Timestamp timestamp,
|
||||
const faiss::BitsetView& view,
|
||||
QueryResult& result);
|
||||
|
||||
|
|
|
@ -37,7 +37,8 @@ class ExecExprVisitor : public ExprVisitor {
|
|||
|
||||
public:
|
||||
using RetType = std::deque<boost::dynamic_bitset<>>;
|
||||
explicit ExecExprVisitor(const segcore::SegmentGrowingImpl& segment) : segment_(segment) {
|
||||
ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count)
|
||||
: segment_(segment), row_count_(row_count) {
|
||||
}
|
||||
RetType
|
||||
call_child(Expr& expr) {
|
||||
|
@ -63,7 +64,8 @@ class ExecExprVisitor : public ExprVisitor {
|
|||
ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType;
|
||||
|
||||
private:
|
||||
const segcore::SegmentGrowingImpl& segment_;
|
||||
const segcore::SegmentInternalInterface& segment_;
|
||||
int64_t row_count_;
|
||||
std::optional<RetType> ret_;
|
||||
};
|
||||
} // namespace milvus::query
|
||||
|
|
|
@ -25,7 +25,8 @@ namespace impl {
|
|||
class ExecExprVisitor : ExprVisitor {
|
||||
public:
|
||||
using RetType = std::deque<boost::dynamic_bitset<>>;
|
||||
explicit ExecExprVisitor(const segcore::SegmentGrowingImpl& segment) : segment_(segment) {
|
||||
ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count)
|
||||
: segment_(segment), row_count_(row_count) {
|
||||
}
|
||||
RetType
|
||||
call_child(Expr& expr) {
|
||||
|
@ -51,7 +52,8 @@ class ExecExprVisitor : ExprVisitor {
|
|||
ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType;
|
||||
|
||||
private:
|
||||
const segcore::SegmentGrowingImpl& segment_;
|
||||
const segcore::SegmentInternalInterface& segment_;
|
||||
int64_t row_count_;
|
||||
std::optional<RetType> ret_;
|
||||
};
|
||||
} // namespace impl
|
||||
|
@ -118,36 +120,42 @@ template <typename T, typename IndexFunc, typename ElementFunc>
|
|||
auto
|
||||
ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_func, ElementFunc element_func)
|
||||
-> RetType {
|
||||
auto& records = segment_.get_insert_record();
|
||||
auto data_type = expr.data_type_;
|
||||
auto& schema = segment_.get_schema();
|
||||
auto field_offset = expr.field_offset_;
|
||||
auto& field_meta = schema[field_offset];
|
||||
auto vec_ptr = records.get_entity<T>(field_offset);
|
||||
auto& vec = *vec_ptr;
|
||||
auto& indexing_record = segment_.get_indexing_record();
|
||||
const segcore::ScalarIndexingEntry<T>& entry = indexing_record.get_scalar_entry<T>(field_offset);
|
||||
// auto vec_ptr = records.get_entity<T>(field_offset);
|
||||
// auto& vec = *vec_ptr;
|
||||
// const segcore::ScalarIndexingEntry<T>& entry = indexing_record.get_scalar_entry<T>(field_offset);
|
||||
|
||||
RetType results(vec.num_chunk());
|
||||
auto indexing_barrier = indexing_record.get_finished_ack();
|
||||
auto chunk_size = vec.get_chunk_size();
|
||||
// RetType results(vec.num_chunk());
|
||||
auto indexing_barrier = segment_.num_chunk_index_safe(field_offset);
|
||||
auto chunk_size = segment_.chunk_size();
|
||||
auto num_chunk = upper_div(row_count_, chunk_size);
|
||||
RetType results;
|
||||
|
||||
using Index = knowhere::scalar::StructuredIndex<T>;
|
||||
for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) {
|
||||
auto& result = results[chunk_id];
|
||||
auto indexing = entry.get_indexing(chunk_id);
|
||||
auto data = index_func(indexing);
|
||||
result = std::move(*data);
|
||||
Assert(result.size() == chunk_size);
|
||||
// auto& result = results[chunk_id];
|
||||
const Index& indexing = segment_.chunk_scalar_index<T>(field_offset, chunk_id);
|
||||
// NOTE: knowhere is not const-ready
|
||||
// This is a dirty workaround
|
||||
auto data = index_func(const_cast<Index*>(&indexing));
|
||||
Assert(data->size() == chunk_size);
|
||||
results.emplace_back(std::move(*data));
|
||||
}
|
||||
|
||||
for (auto chunk_id = indexing_barrier; chunk_id < vec.num_chunk(); ++chunk_id) {
|
||||
auto& result = results[chunk_id];
|
||||
for (auto chunk_id = indexing_barrier; chunk_id < num_chunk; ++chunk_id) {
|
||||
boost::dynamic_bitset<> result(chunk_size);
|
||||
// auto& result = results[chunk_id];
|
||||
result.resize(chunk_size);
|
||||
auto chunk = vec.get_chunk(chunk_id);
|
||||
auto chunk = segment_.chunk_data<T>(field_offset, chunk_id);
|
||||
const T* data = chunk.data();
|
||||
for (int index = 0; index < chunk_size; ++index) {
|
||||
result[index] = element_func(data[index]);
|
||||
}
|
||||
Assert(result.size() == chunk_size);
|
||||
results.emplace_back(std::move(result));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
@ -274,29 +282,29 @@ template <typename T>
|
|||
auto
|
||||
ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType {
|
||||
auto& expr = static_cast<TermExprImpl<T>&>(expr_raw);
|
||||
auto& records = segment_.get_insert_record();
|
||||
// auto& records = segment_.get_insert_record();
|
||||
auto data_type = expr.data_type_;
|
||||
auto& schema = segment_.get_schema();
|
||||
|
||||
auto field_offset = expr_raw.field_offset_;
|
||||
auto& field_meta = schema[field_offset];
|
||||
auto vec_ptr = records.get_entity<T>(field_offset);
|
||||
auto& vec = *vec_ptr;
|
||||
auto num_chunk = vec.num_chunk();
|
||||
// auto vec_ptr = records.get_entity<T>(field_offset);
|
||||
// auto& vec = *vec_ptr;
|
||||
auto chunk_size = segment_.chunk_size();
|
||||
auto num_chunk = upper_div(row_count_, chunk_size);
|
||||
RetType bitsets;
|
||||
|
||||
auto N = records.ack_responder_.GetAck();
|
||||
// auto N = records.ack_responder_.GetAck();
|
||||
// TODO: enable index for term
|
||||
|
||||
// small batch
|
||||
auto chunk_size = vec.get_chunk_size();
|
||||
for (int64_t chunk_id = 0; chunk_id < num_chunk; ++chunk_id) {
|
||||
auto& chunk = vec.get_chunk(chunk_id);
|
||||
Span<T> chunk = segment_.chunk_data<T>(field_offset, chunk_id);
|
||||
|
||||
auto size = chunk_id == num_chunk - 1 ? N - chunk_id * chunk_size : chunk_size;
|
||||
auto size = chunk_id == num_chunk - 1 ? row_count_ - chunk_id * chunk_size : chunk_size;
|
||||
|
||||
boost::dynamic_bitset<> bitset(chunk_size);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto value = chunk[i];
|
||||
auto value = chunk.data()[i];
|
||||
bool is_in = std::binary_search(expr.terms_.begin(), expr.terms_.end(), value);
|
||||
bitset[i] = is_in;
|
||||
}
|
||||
|
|
|
@ -75,18 +75,20 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
|||
|
||||
aligned_vector<uint8_t> bitset_holder;
|
||||
BitsetView view;
|
||||
// TODO: add API to unify row_count
|
||||
auto row_count = segcore::get_barrier(segment->get_insert_record(), timestamp_);
|
||||
|
||||
if (node.predicate_.has_value()) {
|
||||
ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment).call_child(*node.predicate_.value());
|
||||
ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment, row_count).call_child(*node.predicate_.value());
|
||||
bitset_holder = AssembleNegBitmap(expr_ret);
|
||||
view = BitsetView(bitset_holder.data(), bitset_holder.size() * 8);
|
||||
}
|
||||
|
||||
auto& sealed_indexing = segment->get_sealed_indexing_record();
|
||||
if (sealed_indexing.is_ready(node.query_info_.field_offset_)) {
|
||||
SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, timestamp_,
|
||||
view, ret);
|
||||
SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, view, ret);
|
||||
} else {
|
||||
SearchOnGrowing<VectorType>(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret);
|
||||
SearchOnGrowing<VectorType>(*segment, row_count, node.query_info_, src_data, num_queries, view, ret);
|
||||
}
|
||||
|
||||
ret_ = ret;
|
||||
|
|
|
@ -306,7 +306,7 @@ TEST(Expr, TestRange) {
|
|||
}
|
||||
|
||||
auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
|
||||
ExecExprVisitor visitor(*seg_promote);
|
||||
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count());
|
||||
for (auto [clause, ref_func] : testcases) {
|
||||
auto loc = dsl_string_tmp.find("@@@@");
|
||||
auto dsl_string = dsl_string_tmp;
|
||||
|
@ -390,7 +390,7 @@ TEST(Expr, TestTerm) {
|
|||
}
|
||||
|
||||
auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
|
||||
ExecExprVisitor visitor(*seg_promote);
|
||||
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count());
|
||||
for (auto [clause, ref_func] : testcases) {
|
||||
auto loc = dsl_string_tmp.find("@@@@");
|
||||
auto dsl_string = dsl_string_tmp;
|
||||
|
@ -492,7 +492,7 @@ TEST(Expr, TestSimpleDsl) {
|
|||
}
|
||||
|
||||
auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
|
||||
ExecExprVisitor visitor(*seg_promote);
|
||||
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count());
|
||||
for (auto [clause, ref_func] : testcases) {
|
||||
Json dsl;
|
||||
dsl["bool"] = clause;
|
||||
|
|
|
@ -1,6 +1,16 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include "segcore/SegmentSealed.h"
|
||||
|
||||
TEST(Load, Naive) {
|
||||
|
||||
}
|
Loading…
Reference in New Issue