From 9ff1bc0f9951d60d896be74b5bfff16b9f642645 Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Sat, 16 Jan 2021 18:12:58 +0800 Subject: [PATCH] Adapt Expr Executor to SegmentInternalInterface, fix row_count consistency Signed-off-by: FluorineDog --- internal/core/src/query/SearchOnGrowing.cpp | 17 +++-- internal/core/src/query/SearchOnGrowing.h | 2 +- internal/core/src/query/SearchOnSealed.cpp | 1 - internal/core/src/query/SearchOnSealed.h | 1 - .../src/query/generated/ExecExprVisitor.h | 6 +- .../src/query/visitors/ExecExprVisitor.cpp | 64 +++++++++++-------- .../query/visitors/ExecPlanNodeVisitor.cpp | 10 +-- internal/core/unittest/test_expr.cpp | 6 +- internal/core/unittest/test_load.cpp | 12 +++- 9 files changed, 69 insertions(+), 50 deletions(-) diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index 46aecb3431..1389b71fd0 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -39,14 +39,13 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, const query::QueryInfo& info, const float* query_data, int64_t num_queries, - Timestamp timestamp, + int64_t ins_barrier, const BitsetView& bitset, QueryResult& results) { auto& schema = segment.get_schema(); auto& indexing_record = segment.get_indexing_record(); auto& record = segment.get_insert_record(); // step 1: binary search to find the barrier of the snapshot - auto ins_barrier = get_barrier(record, timestamp); // auto del_barrier = get_barrier(deleted_record_, timestamp); #if 0 @@ -131,14 +130,14 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, const query::QueryInfo& info, const uint8_t* query_data, int64_t num_queries, - Timestamp timestamp, + int64_t ins_barrier, const faiss::BitsetView& bitset, QueryResult& results) { auto& schema = segment.get_schema(); auto& indexing_record = segment.get_indexing_record(); auto& record = segment.get_insert_record(); // step 1: binary search to find the barrier of the snapshot - auto ins_barrier = get_barrier(record, timestamp); + // auto ins_barrier = get_barrier(record, timestamp); auto metric_type = GetMetricType(info.metric_type_); // auto del_barrier = get_barrier(deleted_record_, timestamp); @@ -199,33 +198,33 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, template void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + int64_t ins_barrier, const query::QueryInfo& info, const EmbeddedType* query_data, int64_t num_queries, - Timestamp timestamp, const faiss::BitsetView& bitset, QueryResult& results) { static_assert(IsVector); if constexpr (std::is_same_v) { - FloatSearch(segment, info, query_data, num_queries, timestamp, bitset, results); + FloatSearch(segment, info, query_data, num_queries, ins_barrier, bitset, results); } else { - BinarySearch(segment, info, query_data, num_queries, timestamp, bitset, results); + BinarySearch(segment, info, query_data, num_queries, ins_barrier, bitset, results); } } template void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + int64_t ins_barrier, const query::QueryInfo& info, const EmbeddedType* query_data, int64_t num_queries, - Timestamp timestamp, const faiss::BitsetView& bitset, QueryResult& results); template void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + int64_t ins_barrier, const query::QueryInfo& info, const EmbeddedType* query_data, int64_t num_queries, - Timestamp timestamp, const faiss::BitsetView& bitset, QueryResult& results); diff --git a/internal/core/src/query/SearchOnGrowing.h b/internal/core/src/query/SearchOnGrowing.h index 003bdcf35b..f1251d5124 100644 --- a/internal/core/src/query/SearchOnGrowing.h +++ b/internal/core/src/query/SearchOnGrowing.h @@ -23,10 +23,10 @@ using BitmapSimple = std::deque; template void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, + int64_t ins_barrier, const query::QueryInfo& info, const EmbeddedType* query_data, int64_t num_queries, - Timestamp timestamp, const faiss::BitsetView& bitset, QueryResult& results); } // namespace milvus::query diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index 83b265a721..2aec8b41e8 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -52,7 +52,6 @@ SearchOnSealed(const Schema& schema, const QueryInfo& query_info, const void* query_data, int64_t num_queries, - Timestamp timestamp, const faiss::BitsetView& bitset, QueryResult& result) { auto topK = query_info.topK_; diff --git a/internal/core/src/query/SearchOnSealed.h b/internal/core/src/query/SearchOnSealed.h index 227f1a15ce..1d836c719a 100644 --- a/internal/core/src/query/SearchOnSealed.h +++ b/internal/core/src/query/SearchOnSealed.h @@ -26,7 +26,6 @@ SearchOnSealed(const Schema& schema, const QueryInfo& query_info, const void* query_data, int64_t num_queries, - Timestamp timestamp, const faiss::BitsetView& view, QueryResult& result); diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h index 981be05a16..96e9439ed2 100644 --- a/internal/core/src/query/generated/ExecExprVisitor.h +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -37,7 +37,8 @@ class ExecExprVisitor : public ExprVisitor { public: using RetType = std::deque>; - explicit ExecExprVisitor(const segcore::SegmentGrowingImpl& segment) : segment_(segment) { + ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count) + : segment_(segment), row_count_(row_count) { } RetType call_child(Expr& expr) { @@ -63,7 +64,8 @@ class ExecExprVisitor : public ExprVisitor { ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType; private: - const segcore::SegmentGrowingImpl& segment_; + const segcore::SegmentInternalInterface& segment_; + int64_t row_count_; std::optional ret_; }; } // namespace milvus::query diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 37353faa5a..5d2b44c96f 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -25,7 +25,8 @@ namespace impl { class ExecExprVisitor : ExprVisitor { public: using RetType = std::deque>; - explicit ExecExprVisitor(const segcore::SegmentGrowingImpl& segment) : segment_(segment) { + ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count) + : segment_(segment), row_count_(row_count) { } RetType call_child(Expr& expr) { @@ -51,7 +52,8 @@ class ExecExprVisitor : ExprVisitor { ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType; private: - const segcore::SegmentGrowingImpl& segment_; + const segcore::SegmentInternalInterface& segment_; + int64_t row_count_; std::optional ret_; }; } // namespace impl @@ -118,36 +120,42 @@ template auto ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl& expr, IndexFunc index_func, ElementFunc element_func) -> RetType { - auto& records = segment_.get_insert_record(); auto data_type = expr.data_type_; auto& schema = segment_.get_schema(); auto field_offset = expr.field_offset_; auto& field_meta = schema[field_offset]; - auto vec_ptr = records.get_entity(field_offset); - auto& vec = *vec_ptr; - auto& indexing_record = segment_.get_indexing_record(); - const segcore::ScalarIndexingEntry& entry = indexing_record.get_scalar_entry(field_offset); + // auto vec_ptr = records.get_entity(field_offset); + // auto& vec = *vec_ptr; + // const segcore::ScalarIndexingEntry& entry = indexing_record.get_scalar_entry(field_offset); - RetType results(vec.num_chunk()); - auto indexing_barrier = indexing_record.get_finished_ack(); - auto chunk_size = vec.get_chunk_size(); + // RetType results(vec.num_chunk()); + auto indexing_barrier = segment_.num_chunk_index_safe(field_offset); + auto chunk_size = segment_.chunk_size(); + auto num_chunk = upper_div(row_count_, chunk_size); + RetType results; + + using Index = knowhere::scalar::StructuredIndex; for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) { - auto& result = results[chunk_id]; - auto indexing = entry.get_indexing(chunk_id); - auto data = index_func(indexing); - result = std::move(*data); - Assert(result.size() == chunk_size); + // auto& result = results[chunk_id]; + const Index& indexing = segment_.chunk_scalar_index(field_offset, chunk_id); + // NOTE: knowhere is not const-ready + // This is a dirty workaround + auto data = index_func(const_cast(&indexing)); + Assert(data->size() == chunk_size); + results.emplace_back(std::move(*data)); } - for (auto chunk_id = indexing_barrier; chunk_id < vec.num_chunk(); ++chunk_id) { - auto& result = results[chunk_id]; + for (auto chunk_id = indexing_barrier; chunk_id < num_chunk; ++chunk_id) { + boost::dynamic_bitset<> result(chunk_size); + // auto& result = results[chunk_id]; result.resize(chunk_size); - auto chunk = vec.get_chunk(chunk_id); + auto chunk = segment_.chunk_data(field_offset, chunk_id); const T* data = chunk.data(); for (int index = 0; index < chunk_size; ++index) { result[index] = element_func(data[index]); } Assert(result.size() == chunk_size); + results.emplace_back(std::move(result)); } return results; } @@ -274,29 +282,29 @@ template auto ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType { auto& expr = static_cast&>(expr_raw); - auto& records = segment_.get_insert_record(); + // auto& records = segment_.get_insert_record(); auto data_type = expr.data_type_; auto& schema = segment_.get_schema(); auto field_offset = expr_raw.field_offset_; auto& field_meta = schema[field_offset]; - auto vec_ptr = records.get_entity(field_offset); - auto& vec = *vec_ptr; - auto num_chunk = vec.num_chunk(); + // auto vec_ptr = records.get_entity(field_offset); + // auto& vec = *vec_ptr; + auto chunk_size = segment_.chunk_size(); + auto num_chunk = upper_div(row_count_, chunk_size); RetType bitsets; - auto N = records.ack_responder_.GetAck(); + // auto N = records.ack_responder_.GetAck(); + // TODO: enable index for term - // small batch - auto chunk_size = vec.get_chunk_size(); for (int64_t chunk_id = 0; chunk_id < num_chunk; ++chunk_id) { - auto& chunk = vec.get_chunk(chunk_id); + Span chunk = segment_.chunk_data(field_offset, chunk_id); - auto size = chunk_id == num_chunk - 1 ? N - chunk_id * chunk_size : chunk_size; + auto size = chunk_id == num_chunk - 1 ? row_count_ - chunk_id * chunk_size : chunk_size; boost::dynamic_bitset<> bitset(chunk_size); for (int i = 0; i < size; ++i) { - auto value = chunk[i]; + auto value = chunk.data()[i]; bool is_in = std::binary_search(expr.terms_.begin(), expr.terms_.end(), value); bitset[i] = is_in; } diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index e43837a060..62cfeb56e3 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -75,18 +75,20 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { aligned_vector bitset_holder; BitsetView view; + // TODO: add API to unify row_count + auto row_count = segcore::get_barrier(segment->get_insert_record(), timestamp_); + if (node.predicate_.has_value()) { - ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment).call_child(*node.predicate_.value()); + ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment, row_count).call_child(*node.predicate_.value()); bitset_holder = AssembleNegBitmap(expr_ret); view = BitsetView(bitset_holder.data(), bitset_holder.size() * 8); } auto& sealed_indexing = segment->get_sealed_indexing_record(); if (sealed_indexing.is_ready(node.query_info_.field_offset_)) { - SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, timestamp_, - view, ret); + SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, view, ret); } else { - SearchOnGrowing(*segment, node.query_info_, src_data, num_queries, timestamp_, view, ret); + SearchOnGrowing(*segment, row_count, node.query_info_, src_data, num_queries, view, ret); } ret_ = ret; diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index a979ac929a..7d104400fd 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -306,7 +306,7 @@ TEST(Expr, TestRange) { } auto seg_promote = dynamic_cast(seg.get()); - ExecExprVisitor visitor(*seg_promote); + ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count()); for (auto [clause, ref_func] : testcases) { auto loc = dsl_string_tmp.find("@@@@"); auto dsl_string = dsl_string_tmp; @@ -390,7 +390,7 @@ TEST(Expr, TestTerm) { } auto seg_promote = dynamic_cast(seg.get()); - ExecExprVisitor visitor(*seg_promote); + ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count()); for (auto [clause, ref_func] : testcases) { auto loc = dsl_string_tmp.find("@@@@"); auto dsl_string = dsl_string_tmp; @@ -492,7 +492,7 @@ TEST(Expr, TestSimpleDsl) { } auto seg_promote = dynamic_cast(seg.get()); - ExecExprVisitor visitor(*seg_promote); + ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count()); for (auto [clause, ref_func] : testcases) { Json dsl; dsl["bool"] = clause; diff --git a/internal/core/unittest/test_load.cpp b/internal/core/unittest/test_load.cpp index 9d6f2b1fd9..3b02aae9b5 100644 --- a/internal/core/unittest/test_load.cpp +++ b/internal/core/unittest/test_load.cpp @@ -1,6 +1,16 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + #include #include "segcore/SegmentSealed.h" TEST(Load, Naive) { - } \ No newline at end of file