From e7dd30a88489188eac43b2109c49a2f06db8c5e8 Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Wed, 18 Nov 2020 17:32:52 +0800 Subject: [PATCH] Add framework of ExecExprVisitor Signed-off-by: FluorineDog --- configs/advanced/flow_graph.yaml | 14 +++ configs/advanced/reader.yaml | 23 ++++ internal/core/src/query/CMakeLists.txt | 1 + internal/core/src/query/Expr.h | 12 +- internal/core/src/query/Plan.cpp | 12 +- internal/core/src/query/PlanImpl.h | 2 +- internal/core/src/query/PlanNode.h | 4 - .../src/query/generated/ExecExprVisitor.cpp | 25 ++++ .../src/query/generated/ExecExprVisitor.h | 41 +++++++ .../src/query/generated/ExecPlanNodeVisitor.h | 4 +- .../src/query/generated/ShowExprVisitor.h | 8 +- .../src/query/generated/ShowPlanNodeVisitor.h | 4 +- .../src/query/visitors/ExecExprVisitor.cpp | 51 +++++++++ internal/core/src/segcore/CMakeLists.txt | 1 - internal/core/src/segcore/Collection.cpp | 3 +- .../core/src/segcore/ConcurrentVector.cpp | 5 - internal/core/src/segcore/IndexingEntry.cpp | 3 +- internal/core/src/segcore/SegmentNaive.cpp | 6 +- internal/core/src/segcore/SegmentNaive.h | 6 +- .../core/src/segcore/SegmentSmallIndex.cpp | 31 ++--- internal/core/src/segcore/SegmentSmallIndex.h | 10 +- internal/core/unittest/test_c_api.cpp | 4 +- internal/core/unittest/test_query.cpp | 2 +- internal/reader/data_sync_service.go | 1 + internal/reader/flow_graph_delete_node.go | 5 + internal/reader/flow_graph_filter_dm_node.go | 4 + internal/reader/flow_graph_insert_node.go | 4 + internal/reader/flow_graph_key2seg_node.go | 5 + .../flow_graph_msg_stream_input_nodes.go | 6 +- internal/reader/flow_graph_node.go | 3 - .../reader/flow_graph_schema_update_node.go | 5 + .../reader/flow_graph_service_time_node.go | 5 + internal/reader/param_table.go | 108 +++++++++++++++++- internal/reader/param_table_test.go | 59 +++++++++- internal/reader/search_service.go | 7 +- internal/reader/stats_service.go | 10 +- internal/util/flowgraph/flow_graph_test.go | 3 + internal/util/flowgraph/input_node.go | 7 +- internal/util/flowgraph/param_table.go | 45 ++++++++ internal/util/flowgraph/param_table_test.go | 19 +++ internal/util/flowgraph/type_def.go | 3 - tools/core_gen/all_generate.py | 14 ++- tools/core_gen/templates/visitor_derived.h | 4 +- 43 files changed, 489 insertions(+), 100 deletions(-) create mode 100644 configs/advanced/flow_graph.yaml create mode 100644 configs/advanced/reader.yaml create mode 100644 internal/core/src/query/generated/ExecExprVisitor.cpp create mode 100644 internal/core/src/query/generated/ExecExprVisitor.h create mode 100644 internal/core/src/query/visitors/ExecExprVisitor.cpp delete mode 100644 internal/core/src/segcore/ConcurrentVector.cpp create mode 100644 internal/util/flowgraph/param_table.go create mode 100644 internal/util/flowgraph/param_table_test.go diff --git a/configs/advanced/flow_graph.yaml b/configs/advanced/flow_graph.yaml new file mode 100644 index 0000000000..d6590177df --- /dev/null +++ b/configs/advanced/flow_graph.yaml @@ -0,0 +1,14 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License. + +flowGraph: + maxQueueLength: 1024 + maxParallelism: 1024 diff --git a/configs/advanced/reader.yaml b/configs/advanced/reader.yaml new file mode 100644 index 0000000000..f8d1c15905 --- /dev/null +++ b/configs/advanced/reader.yaml @@ -0,0 +1,23 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License. + +service: + statsServiceTimeInterval: 1000 # milliseconds + +msgStream: + receiveBufSize: # msgPack chan buffer size + statsMsgStream: 64 + dmMsgStream: 1024 + searchMsgStream: 512 + searchResultMsgStream: 64 + pulsarBufSize: # pulsar chan buffer size + search: 512 + dm: 1024 diff --git a/internal/core/src/query/CMakeLists.txt b/internal/core/src/query/CMakeLists.txt index 887f18e843..395bb345d4 100644 --- a/internal/core/src/query/CMakeLists.txt +++ b/internal/core/src/query/CMakeLists.txt @@ -6,6 +6,7 @@ set(MILVUS_QUERY_SRCS visitors/ShowPlanNodeVisitor.cpp visitors/ExecPlanNodeVisitor.cpp visitors/ShowExprVisitor.cpp + visitors/ExecExprVisitor.cpp Plan.cpp ) add_library(milvus_query ${MILVUS_QUERY_SRCS}) diff --git a/internal/core/src/query/Expr.h b/internal/core/src/query/Expr.h index 5851594539..d0d0fa7ec3 100644 --- a/internal/core/src/query/Expr.h +++ b/internal/core/src/query/Expr.h @@ -22,18 +22,10 @@ using ExprPtr = std::unique_ptr; struct BinaryExpr : Expr { ExprPtr left_; ExprPtr right_; - - public: - void - accept(ExprVisitor&) = 0; }; struct UnaryExpr : Expr { ExprPtr child_; - - public: - void - accept(ExprVisitor&) = 0; }; // TODO: not enabled in sprint 1 @@ -60,7 +52,7 @@ using FieldId = std::string; struct TermExpr : Expr { FieldId field_id_; - DataType data_type_; + DataType data_type_ = DataType::NONE; // std::vector terms_; protected: @@ -74,7 +66,7 @@ struct TermExpr : Expr { struct RangeExpr : Expr { FieldId field_id_; - DataType data_type_; + DataType data_type_ = DataType::NONE; enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual }; static const std::map mapping_; // op_name -> op diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 882e37e46a..d4bde83788 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -52,8 +52,10 @@ ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Js expr->data_type_ = data_type; expr->field_id_ = field_name; for (auto& item : body.items()) { - auto& op_name = item.key(); - auto op = RangeExpr::mapping_.at(to_lower(op_name)); + auto op_name = to_lower(item.key()); + + AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found"); + auto op = RangeExpr::mapping_.at(op_name); T value = item.value(); expr->conditions_.emplace_back(op, value); } @@ -130,7 +132,6 @@ CreatePlan(const Schema& schema, const std::string& dsl_str) { std::unique_ptr ParsePlaceholderGroup(const Plan* plan, const std::string& blob) { - (void)plan; namespace ser = milvus::proto::service; auto result = std::make_unique(); ser::PlaceholderGroup ph_group; @@ -139,9 +140,14 @@ ParsePlaceholderGroup(const Plan* plan, const std::string& blob) { for (auto& info : ph_group.placeholders()) { Placeholder element; element.tag_ = info.tag(); + Assert(plan->tag2field_.count(element.tag_)); + auto field_id = plan->tag2field_.at(element.tag_); + auto& field_meta = plan->schema_[field_id]; element.num_of_queries_ = info.values_size(); AssertInfo(element.num_of_queries_, "must have queries"); + Assert(element.num_of_queries_ > 0); element.line_sizeof_ = info.values().Get(0).size(); + Assert(field_meta.get_sizeof() == element.line_sizeof_); auto& target = element.blob_; target.reserve(element.line_sizeof_ * element.num_of_queries_); for (auto& line : info.values()) { diff --git a/internal/core/src/query/PlanImpl.h b/internal/core/src/query/PlanImpl.h index 81b6daf497..e1d738114a 100644 --- a/internal/core/src/query/PlanImpl.h +++ b/internal/core/src/query/PlanImpl.h @@ -14,7 +14,7 @@ using Json = nlohmann::json; // class definitions struct Plan { public: - Plan(const Schema& schema) : schema_(schema) { + explicit Plan(const Schema& schema) : schema_(schema) { } public: diff --git a/internal/core/src/query/PlanNode.h b/internal/core/src/query/PlanNode.h index 0cb1d06efc..f164e7ee85 100644 --- a/internal/core/src/query/PlanNode.h +++ b/internal/core/src/query/PlanNode.h @@ -38,10 +38,6 @@ struct VectorPlanNode : PlanNode { std::optional predicate_; QueryInfo query_info_; std::string placeholder_tag_; - - public: - virtual void - accept(PlanNodeVisitor&) = 0; }; struct FloatVectorANNS : VectorPlanNode { diff --git a/internal/core/src/query/generated/ExecExprVisitor.cpp b/internal/core/src/query/generated/ExecExprVisitor.cpp new file mode 100644 index 0000000000..47ef676a2a --- /dev/null +++ b/internal/core/src/query/generated/ExecExprVisitor.cpp @@ -0,0 +1,25 @@ +#error TODO: copy this file out, and modify the content. +#include "query/generated/ExecExprVisitor.h" + +namespace milvus::query { +void +ExecExprVisitor::visit(BoolUnaryExpr& expr) { + // TODO +} + +void +ExecExprVisitor::visit(BoolBinaryExpr& expr) { + // TODO +} + +void +ExecExprVisitor::visit(TermExpr& expr) { + // TODO +} + +void +ExecExprVisitor::visit(RangeExpr& expr) { + // TODO +} + +} // namespace milvus::query diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h new file mode 100644 index 0000000000..4a38bbb06c --- /dev/null +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -0,0 +1,41 @@ +#pragma once +// Generated File +// DO NOT EDIT +#include "segcore/SegmentNaive.h" +#include +#include "ExprVisitor.h" + +namespace milvus::query { +class ExecExprVisitor : ExprVisitor { + public: + void + visit(BoolUnaryExpr& expr) override; + + void + visit(BoolBinaryExpr& expr) override; + + void + visit(TermExpr& expr) override; + + void + visit(RangeExpr& expr) override; + + public: + using RetType = faiss::ConcurrentBitsetPtr; + explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) { + } + RetType + call_child(Expr& expr) { + Assert(!ret_.has_value()); + expr.accept(*this); + Assert(ret_.has_value()); + auto ret = std::move(ret_); + ret_ = std::nullopt; + return std::move(ret.value()); + } + + private: + segcore::SegmentNaive& segment_; + std::optional ret_; +}; +} // namespace milvus::query diff --git a/internal/core/src/query/generated/ExecPlanNodeVisitor.h b/internal/core/src/query/generated/ExecPlanNodeVisitor.h index b1a1017960..58785ca0e7 100644 --- a/internal/core/src/query/generated/ExecPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ExecPlanNodeVisitor.h @@ -9,10 +9,10 @@ namespace milvus::query { class ExecPlanNodeVisitor : PlanNodeVisitor { public: - virtual void + void visit(FloatVectorANNS& node) override; - virtual void + void visit(BinaryVectorANNS& node) override; public: diff --git a/internal/core/src/query/generated/ShowExprVisitor.h b/internal/core/src/query/generated/ShowExprVisitor.h index 53053480ae..b76844417e 100644 --- a/internal/core/src/query/generated/ShowExprVisitor.h +++ b/internal/core/src/query/generated/ShowExprVisitor.h @@ -9,16 +9,16 @@ namespace milvus::query { class ShowExprVisitor : ExprVisitor { public: - virtual void + void visit(BoolUnaryExpr& expr) override; - virtual void + void visit(BoolBinaryExpr& expr) override; - virtual void + void visit(TermExpr& expr) override; - virtual void + void visit(RangeExpr& expr) override; public: diff --git a/internal/core/src/query/generated/ShowPlanNodeVisitor.h b/internal/core/src/query/generated/ShowPlanNodeVisitor.h index 1835cb5547..03afe8ec85 100644 --- a/internal/core/src/query/generated/ShowPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ShowPlanNodeVisitor.h @@ -10,10 +10,10 @@ namespace milvus::query { class ShowPlanNodeVisitor : PlanNodeVisitor { public: - virtual void + void visit(FloatVectorANNS& node) override; - virtual void + void visit(BinaryVectorANNS& node) override; public: diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp new file mode 100644 index 0000000000..055762511b --- /dev/null +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -0,0 +1,51 @@ +#include "segcore/SegmentNaive.h" +#include +#include "query/generated/ExecExprVisitor.h" + +namespace milvus::query { +#if 1 +// THIS CONTAINS EXTRA BODY FOR VISITOR +// WILL BE USED BY GENERATOR +namespace impl { +class ExecExprVisitor : ExprVisitor { + public: + using RetType = faiss::ConcurrentBitsetPtr; + explicit ExecExprVisitor(segcore::SegmentNaive& segment) : segment_(segment) { + } + RetType + call_child(Expr& expr) { + Assert(!ret_.has_value()); + expr.accept(*this); + Assert(ret_.has_value()); + auto ret = std::move(ret_); + ret_ = std::nullopt; + return std::move(ret.value()); + } + + private: + segcore::SegmentNaive& segment_; + std::optional ret_; +}; +} // namespace impl +#endif + +void +ExecExprVisitor::visit(BoolUnaryExpr& expr) { + PanicInfo("unimplemented"); +} + +void +ExecExprVisitor::visit(BoolBinaryExpr& expr) { + PanicInfo("unimplemented"); +} + +void +ExecExprVisitor::visit(TermExpr& expr) { + PanicInfo("unimplemented"); +} + +void +ExecExprVisitor::visit(RangeExpr& expr) { +} + +} // namespace milvus::query diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index a339390105..82d628cd21 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -3,7 +3,6 @@ set(SEGCORE_FILES SegmentNaive.cpp SegmentSmallIndex.cpp IndexMeta.cpp - ConcurrentVector.cpp Collection.cpp collection_c.cpp segment_c.cpp diff --git a/internal/core/src/segcore/Collection.cpp b/internal/core/src/segcore/Collection.cpp index fb3a1df0ea..c7084df73d 100644 --- a/internal/core/src/segcore/Collection.cpp +++ b/internal/core/src/segcore/Collection.cpp @@ -4,7 +4,6 @@ #include "pb/etcd_meta.pb.h" #include #include -#include namespace milvus::segcore { @@ -134,7 +133,7 @@ Collection::parse() { auto schema = std::make_shared(); for (const milvus::proto::schema::FieldSchema& child : collection_meta.schema().fields()) { const auto& type_params = child.type_params(); - int dim = 16; + int64_t dim = 16; for (const auto& type_param : type_params) { if (type_param.key() == "dim") { dim = strtoll(type_param.value().c_str(), nullptr, 10); diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp deleted file mode 100644 index 6be1cb74f0..0000000000 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ /dev/null @@ -1,5 +0,0 @@ - -#include -#include "segcore/ConcurrentVector.h" - -namespace milvus::segcore {} diff --git a/internal/core/src/segcore/IndexingEntry.cpp b/internal/core/src/segcore/IndexingEntry.cpp index b06fc69716..44bc2a43d2 100644 --- a/internal/core/src/segcore/IndexingEntry.cpp +++ b/internal/core/src/segcore/IndexingEntry.cpp @@ -11,7 +11,8 @@ IndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBas assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field_meta_.get_dim(); - auto source = static_cast*>(vec_base); + auto source = dynamic_cast*>(vec_base); + Assert(source); auto chunk_size = source->chunk_size(); assert(ack_end <= chunk_size); auto conf = get_build_conf(); diff --git a/internal/core/src/segcore/SegmentNaive.cpp b/internal/core/src/segcore/SegmentNaive.cpp index 6e79976f00..917b4b524a 100644 --- a/internal/core/src/segcore/SegmentNaive.cpp +++ b/internal/core/src/segcore/SegmentNaive.cpp @@ -451,10 +451,10 @@ SegmentNaive::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp ti Status SegmentNaive::Close() { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("insert not ready"); + PanicInfo("insert not ready"); } - if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("delete not ready"); + if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) { + PanicInfo("delete not ready"); } state_ = SegmentState::Closed; return Status::OK(); diff --git a/internal/core/src/segcore/SegmentNaive.h b/internal/core/src/segcore/SegmentNaive.h index 07bdf9e451..a4e35afb56 100644 --- a/internal/core/src/segcore/SegmentNaive.h +++ b/internal/core/src/segcore/SegmentNaive.h @@ -20,8 +20,6 @@ namespace milvus::segcore { class SegmentNaive : public SegmentBase { public: - virtual ~SegmentNaive() = default; - // SegmentBase(std::shared_ptr collection); int64_t @@ -47,7 +45,7 @@ class SegmentNaive : public SegmentBase { Status QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override; - virtual Status + Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], const Timestamp timestamps[], @@ -106,7 +104,7 @@ class SegmentNaive : public SegmentBase { friend std::unique_ptr CreateSegment(SchemaPtr schema); - explicit SegmentNaive(SchemaPtr schema) : schema_(schema), record_(*schema) { + explicit SegmentNaive(const SchemaPtr& schema) : schema_(schema), record_(*schema) { } private: diff --git a/internal/core/src/segcore/SegmentSmallIndex.cpp b/internal/core/src/segcore/SegmentSmallIndex.cpp index 272ecb157e..140f55d50f 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.cpp +++ b/internal/core/src/segcore/SegmentSmallIndex.cpp @@ -50,7 +50,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier, for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) { // get uid in delete logs auto uid = deleted_record_.uids_[del_index]; - // map uid to corrensponding offsets, select the max one, which should be the target + // map uid to corresponding offsets, select the max one, which should be the target // the max one should be closest to query_timestamp, so the delete log should refer to it int64_t the_offset = -1; auto [iter_b, iter_e] = uid2offset_.equal_range(uid); @@ -73,7 +73,7 @@ SegmentSmallIndex::get_deleted_bitmap(int64_t del_barrier, for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) { // get uid in delete logs auto uid = deleted_record_.uids_[del_index]; - // map uid to corrensponding offsets, select the max one, which should be the target + // map uid to corresponding offsets, select the max one, which should be the target // the max one should be closest to query_timestamp, so the delete log should refer to it int64_t the_offset = -1; auto [iter_b, iter_e] = uid2offset_.equal_range(uid); @@ -228,7 +228,7 @@ SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info, QueryResult& results) { // step 1: binary search to find the barrier of the snapshot auto ins_barrier = get_barrier(record_, timestamp); - auto del_barrier = get_barrier(deleted_record_, timestamp); + // auto del_barrier = get_barrier(deleted_record_, timestamp); #if 0 auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); Assert(bitmap_holder); @@ -321,7 +321,6 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta x = dis(e); } } - int64_t inferred_dim = query_info->query_raw_data.size() / query_info->num_queries; // TODO query::QueryInfo info{ query_info->topK, @@ -338,10 +337,10 @@ SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timesta Status SegmentSmallIndex::Close() { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("insert not ready"); + PanicInfo("insert not ready"); } - if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) { - std::runtime_error("delete not ready"); + if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) { + PanicInfo("delete not ready"); } state_ = SegmentState::Closed; return Status::OK(); @@ -357,8 +356,6 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) { auto dim = field.get_dim(); auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode); - auto chunk_size = record_.uids_.chunk_size(); - auto& uids = record_.uids_; auto entities = record_.get_vec_entity(offset); @@ -398,7 +395,7 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) { auto index_meta = std::make_shared(schema_); // TODO: this is merge of query conf and insert conf - // TODO: should be splitted into multiple configs + // TODO: should be split into multiple configs auto conf = milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100}, {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4}, @@ -431,8 +428,16 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) { } index_ready_ = true; -#endif return Status::OK(); +#endif +} + +static uint64_t +upper_align(int64_t value, int64_t align) { + Assert(align > 0); + Assert((align & (align - 1)) == 0); + auto groups = (value + align - 1) / align; + return groups * align; } int64_t @@ -448,9 +453,9 @@ SegmentSmallIndex::GetMemoryUsageInBytes() { } } #endif - int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); + int64_t ins_n = upper_align(record_.reserved, DefaultElementPerChunk); total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1); - int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1); + int64_t del_n = upper_align(deleted_record_.reserved, DefaultElementPerChunk); total_bytes += del_n * (16 * 2); return total_bytes; } diff --git a/internal/core/src/segcore/SegmentSmallIndex.h b/internal/core/src/segcore/SegmentSmallIndex.h index 0a4282c08c..99d1a25d1d 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.h +++ b/internal/core/src/segcore/SegmentSmallIndex.h @@ -46,10 +46,6 @@ namespace milvus::segcore { class SegmentSmallIndex : public SegmentBase { public: - virtual ~SegmentSmallIndex() = default; - - // SegmentBase(std::shared_ptr collection); - int64_t PreInsert(int64_t size) override; @@ -111,6 +107,9 @@ class SegmentSmallIndex : public SegmentBase { GetMemoryUsageInBytes() override; public: + void + get_insert_record(); + ssize_t get_row_count() const override { return record_.ack_responder_.GetAck(); @@ -130,7 +129,8 @@ class SegmentSmallIndex : public SegmentBase { friend std::unique_ptr CreateSegment(SchemaPtr schema); - explicit SegmentSmallIndex(SchemaPtr schema) : schema_(schema), record_(*schema_), indexing_record_(*schema_) { + explicit SegmentSmallIndex(SchemaPtr schema) + : schema_(std::move(schema)), record_(*schema_), indexing_record_(*schema_) { } public: diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index de59eef953..62dff9c310 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -141,7 +141,7 @@ TEST(CApiTest, SearchTest) { auto blob = raw_group.SerializeAsString(); auto plan = CreatePlan(collection, dsl_string); - auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length()); + auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length()); std::vector placeholderGroups; placeholderGroups.push_back(placeholderGroup); timestamps.clear(); @@ -228,7 +228,7 @@ TEST(CApiTest, BuildIndexTest) { auto blob = raw_group.SerializeAsString(); auto plan = CreatePlan(collection, dsl_string); - auto placeholderGroup = ParsePlaceholderGroup(nullptr, blob.data(), blob.length()); + auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length()); std::vector placeholderGroups; placeholderGroups.push_back(placeholderGroup); timestamps.clear(); diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index b29fe10dfa..d3d97def8c 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -133,7 +133,7 @@ TEST(Query, ParsePlaceholderGroup) { { "bool": { "vector": { - "Vec": { + "fakevec": { "metric_type": "L2", "params": { "nprobe": 10 diff --git a/internal/reader/data_sync_service.go b/internal/reader/data_sync_service.go index ffc24e382c..c67f4e58f3 100644 --- a/internal/reader/data_sync_service.go +++ b/internal/reader/data_sync_service.go @@ -37,6 +37,7 @@ func (dsService *dataSyncService) initNodes() { // TODO: add delete pipeline support dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) + flowgraph.Params.Init() var dmStreamNode Node = newDmInputNode(dsService.ctx) var filterDmNode Node = newFilteredDmNode() diff --git a/internal/reader/flow_graph_delete_node.go b/internal/reader/flow_graph_delete_node.go index 56929539ad..b0a81e9d80 100644 --- a/internal/reader/flow_graph_delete_node.go +++ b/internal/reader/flow_graph_delete_node.go @@ -1,5 +1,7 @@ package reader +import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" + type deleteNode struct { BaseNode deleteMsg deleteMsg @@ -14,6 +16,9 @@ func (dNode *deleteNode) Operate(in []*Msg) []*Msg { } func newDeleteNode() *deleteNode { + maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() + maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) diff --git a/internal/reader/flow_graph_filter_dm_node.go b/internal/reader/flow_graph_filter_dm_node.go index 87caa68722..f32dd087ad 100644 --- a/internal/reader/flow_graph_filter_dm_node.go +++ b/internal/reader/flow_graph_filter_dm_node.go @@ -5,6 +5,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type filterDmNode struct { @@ -54,6 +55,9 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { } func newFilteredDmNode() *filterDmNode { + maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() + maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) diff --git a/internal/reader/flow_graph_insert_node.go b/internal/reader/flow_graph_insert_node.go index cb8c9b29ab..2ec7c2dc54 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/reader/flow_graph_insert_node.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type insertNode struct { @@ -126,6 +127,9 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn } func newInsertNode(replica *collectionReplica) *insertNode { + maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() + maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) diff --git a/internal/reader/flow_graph_key2seg_node.go b/internal/reader/flow_graph_key2seg_node.go index fba215aa42..399533f88d 100644 --- a/internal/reader/flow_graph_key2seg_node.go +++ b/internal/reader/flow_graph_key2seg_node.go @@ -1,5 +1,7 @@ package reader +import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" + type key2SegNode struct { BaseNode key2SegMsg key2SegMsg @@ -14,6 +16,9 @@ func (ksNode *key2SegNode) Operate(in []*Msg) []*Msg { } func newKey2SegNode() *key2SegNode { + maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() + maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) diff --git a/internal/reader/flow_graph_msg_stream_input_nodes.go b/internal/reader/flow_graph_msg_stream_input_nodes.go index 5eec6306c0..18edc92ea1 100644 --- a/internal/reader/flow_graph_msg_stream_input_nodes.go +++ b/internal/reader/flow_graph_msg_stream_input_nodes.go @@ -9,10 +9,8 @@ import ( ) func newDmInputNode(ctx context.Context) *flowgraph.InputNode { - const ( - receiveBufSize = 1024 - pulsarBufSize = 1024 - ) + receiveBufSize := Params.dmMsgStreamReceiveBufSize() + pulsarBufSize := Params.dmPulsarBufSize() msgStreamURL, err := Params.PulsarAddress() if err != nil { diff --git a/internal/reader/flow_graph_node.go b/internal/reader/flow_graph_node.go index 88568cfc5e..c87f7a2151 100644 --- a/internal/reader/flow_graph_node.go +++ b/internal/reader/flow_graph_node.go @@ -2,9 +2,6 @@ package reader import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" -const maxQueueLength = flowgraph.MaxQueueLength -const maxParallelism = flowgraph.MaxQueueLength - type BaseNode = flowgraph.BaseNode type Node = flowgraph.Node type InputNode = flowgraph.InputNode diff --git a/internal/reader/flow_graph_schema_update_node.go b/internal/reader/flow_graph_schema_update_node.go index 7c280c15a4..3262b0b1f6 100644 --- a/internal/reader/flow_graph_schema_update_node.go +++ b/internal/reader/flow_graph_schema_update_node.go @@ -1,5 +1,7 @@ package reader +import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" + type schemaUpdateNode struct { BaseNode schemaUpdateMsg schemaUpdateMsg @@ -14,6 +16,9 @@ func (suNode *schemaUpdateNode) Operate(in []*Msg) []*Msg { } func newSchemaUpdateNode() *schemaUpdateNode { + maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() + maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/reader/flow_graph_service_time_node.go index 4fe720540c..8271df117d 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/reader/flow_graph_service_time_node.go @@ -2,6 +2,8 @@ package reader import ( "log" + + "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type serviceTimeNode struct { @@ -33,6 +35,9 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { } func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode { + maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength() + maxParallelism := flowgraph.Params.FlowGraphMaxParallelism() + baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) diff --git a/internal/reader/param_table.go b/internal/reader/param_table.go index ba13e99391..9e3a048fb1 100644 --- a/internal/reader/param_table.go +++ b/internal/reader/param_table.go @@ -12,8 +12,12 @@ type ParamTable struct { var Params ParamTable -func (p *ParamTable) InitParamTable() { - p.Init() +func (p *ParamTable) Init() { + p.BaseTable.Init() + err := p.LoadYaml("advanced/reader.yaml") + if err != nil { + panic(err) + } } func (p *ParamTable) PulsarAddress() (string, error) { @@ -25,7 +29,10 @@ func (p *ParamTable) PulsarAddress() (string, error) { } func (p *ParamTable) QueryNodeID() int { - queryNodeID, _ := p.Load("reader.clientid") + queryNodeID, err := p.Load("reader.clientid") + if err != nil { + panic(err) + } id, err := strconv.Atoi(queryNodeID) if err != nil { panic(err) @@ -34,7 +41,10 @@ func (p *ParamTable) QueryNodeID() int { } func (p *ParamTable) TopicStart() int { - topicStart, _ := p.Load("reader.topicstart") + topicStart, err := p.Load("reader.topicstart") + if err != nil { + panic(err) + } topicStartNum, err := strconv.Atoi(topicStart) if err != nil { panic(err) @@ -43,10 +53,98 @@ func (p *ParamTable) TopicStart() int { } func (p *ParamTable) TopicEnd() int { - topicEnd, _ := p.Load("reader.topicend") + topicEnd, err := p.Load("reader.topicend") + if err != nil { + panic(err) + } topicEndNum, err := strconv.Atoi(topicEnd) if err != nil { panic(err) } return topicEndNum } + +// private advanced params +func (p *ParamTable) statsServiceTimeInterval() int { + timeInterval, err := p.Load("service.statsServiceTimeInterval") + if err != nil { + panic(err) + } + interval, err := strconv.Atoi(timeInterval) + if err != nil { + panic(err) + } + return interval +} + +func (p *ParamTable) statsMsgStreamReceiveBufSize() int64 { + revBufSize, err := p.Load("msgStream.receiveBufSize.statsMsgStream") + if err != nil { + panic(err) + } + bufSize, err := strconv.Atoi(revBufSize) + if err != nil { + panic(err) + } + return int64(bufSize) +} + +func (p *ParamTable) dmMsgStreamReceiveBufSize() int64 { + revBufSize, err := p.Load("msgStream.receiveBufSize.dmMsgStream") + if err != nil { + panic(err) + } + bufSize, err := strconv.Atoi(revBufSize) + if err != nil { + panic(err) + } + return int64(bufSize) +} + +func (p *ParamTable) searchMsgStreamReceiveBufSize() int64 { + revBufSize, err := p.Load("msgStream.receiveBufSize.searchMsgStream") + if err != nil { + panic(err) + } + bufSize, err := strconv.Atoi(revBufSize) + if err != nil { + panic(err) + } + return int64(bufSize) +} + +func (p *ParamTable) searchResultMsgStreamReceiveBufSize() int64 { + revBufSize, err := p.Load("msgStream.receiveBufSize.searchResultMsgStream") + if err != nil { + panic(err) + } + bufSize, err := strconv.Atoi(revBufSize) + if err != nil { + panic(err) + } + return int64(bufSize) +} + +func (p *ParamTable) searchPulsarBufSize() int64 { + pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.search") + if err != nil { + panic(err) + } + bufSize, err := strconv.Atoi(pulsarBufSize) + if err != nil { + panic(err) + } + return int64(bufSize) +} + +func (p *ParamTable) dmPulsarBufSize() int64 { + pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.dm") + if err != nil { + panic(err) + } + bufSize, err := strconv.Atoi(pulsarBufSize) + if err != nil { + panic(err) + } + return int64(bufSize) +} diff --git a/internal/reader/param_table_test.go b/internal/reader/param_table_test.go index e8acc50487..7580aadd10 100644 --- a/internal/reader/param_table_test.go +++ b/internal/reader/param_table_test.go @@ -6,20 +6,73 @@ import ( "github.com/stretchr/testify/assert" ) +func TestParamTable_Init(t *testing.T) { + Params.Init() +} + +func TestParamTable_PulsarAddress(t *testing.T) { + Params.Init() + address, err := Params.PulsarAddress() + assert.NoError(t, err) + assert.Equal(t, address, "pulsar://localhost:6650") +} + func TestParamTable_QueryNodeID(t *testing.T) { - Params.InitParamTable() + Params.Init() id := Params.QueryNodeID() assert.Equal(t, id, 0) } func TestParamTable_TopicStart(t *testing.T) { - Params.InitParamTable() + Params.Init() topicStart := Params.TopicStart() assert.Equal(t, topicStart, 0) } func TestParamTable_TopicEnd(t *testing.T) { - Params.InitParamTable() + Params.Init() topicEnd := Params.TopicEnd() assert.Equal(t, topicEnd, 128) } + +func TestParamTable_statsServiceTimeInterval(t *testing.T) { + Params.Init() + interval := Params.statsServiceTimeInterval() + assert.Equal(t, interval, 1000) +} + +func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) { + Params.Init() + bufSize := Params.statsMsgStreamReceiveBufSize() + assert.Equal(t, bufSize, int64(64)) +} + +func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) { + Params.Init() + bufSize := Params.dmMsgStreamReceiveBufSize() + assert.Equal(t, bufSize, int64(1024)) +} + +func TestParamTable_searchMsgStreamReceiveBufSize(t *testing.T) { + Params.Init() + bufSize := Params.searchMsgStreamReceiveBufSize() + assert.Equal(t, bufSize, int64(512)) +} + +func TestParamTable_searchResultMsgStreamReceiveBufSize(t *testing.T) { + Params.Init() + bufSize := Params.searchResultMsgStreamReceiveBufSize() + assert.Equal(t, bufSize, int64(64)) +} + +func TestParamTable_searchPulsarBufSize(t *testing.T) { + Params.Init() + bufSize := Params.searchPulsarBufSize() + assert.Equal(t, bufSize, int64(512)) +} + +func TestParamTable_dmPulsarBufSize(t *testing.T) { + Params.Init() + bufSize := Params.dmPulsarBufSize() + assert.Equal(t, bufSize, int64(1024)) +} diff --git a/internal/reader/search_service.go b/internal/reader/search_service.go index c74735f4ad..56464d0043 100644 --- a/internal/reader/search_service.go +++ b/internal/reader/search_service.go @@ -35,11 +35,8 @@ type SearchResult struct { } func newSearchService(ctx context.Context, replica *collectionReplica) *searchService { - const ( - //TODO:: read config file - receiveBufSize = 1024 - pulsarBufSize = 1024 - ) + receiveBufSize := Params.searchMsgStreamReceiveBufSize() + pulsarBufSize := Params.searchPulsarBufSize() msgStreamURL, err := Params.PulsarAddress() if err != nil { diff --git a/internal/reader/stats_service.go b/internal/reader/stats_service.go index 9f118e1dd1..9fb1b3a1c3 100644 --- a/internal/reader/stats_service.go +++ b/internal/reader/stats_service.go @@ -28,10 +28,8 @@ func newStatsService(ctx context.Context, replica *collectionReplica) *statsServ } func (sService *statsService) start() { - const ( - receiveBufSize = 1024 - sleepMillisecondTime = 1000 - ) + sleepTimeInterval := Params.statsServiceTimeInterval() + receiveBufSize := Params.statsMsgStreamReceiveBufSize() // start pulsar msgStreamURL, err := Params.PulsarAddress() @@ -50,12 +48,12 @@ func (sService *statsService) start() { (*sService.statsStream).Start() // start service - fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") + fmt.Println("do segments statistic in ", strconv.Itoa(sleepTimeInterval), "ms") for { select { case <-sService.ctx.Done(): return - case <-time.After(sleepMillisecondTime * time.Millisecond): + case <-time.After(time.Duration(sleepTimeInterval) * time.Millisecond): sService.sendSegmentStatistic() } } diff --git a/internal/util/flowgraph/flow_graph_test.go b/internal/util/flowgraph/flow_graph_test.go index 27f363ff86..63cd91fa2c 100644 --- a/internal/util/flowgraph/flow_graph_test.go +++ b/internal/util/flowgraph/flow_graph_test.go @@ -175,6 +175,9 @@ func receiveResult(ctx context.Context, fg *TimeTickedFlowGraph) (float64, bool) } func TestTimeTickedFlowGraph_Start(t *testing.T) { + const MaxQueueLength = 1024 + const MaxParallelism = 1024 + duration := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), duration) defer cancel() diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index c6882a2036..7251f59044 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -46,9 +46,12 @@ func (inNode *InputNode) Operate(in []*Msg) []*Msg { } func NewInputNode(inStream *msgstream.MsgStream, nodeName string) *InputNode { + maxQueueLength := Params.FlowGraphMaxQueueLength() + maxParallelism := Params.FlowGraphMaxParallelism() + baseNode := BaseNode{} - baseNode.SetMaxQueueLength(MaxQueueLength) - baseNode.SetMaxParallelism(MaxParallelism) + baseNode.SetMaxQueueLength(maxQueueLength) + baseNode.SetMaxParallelism(maxParallelism) return &InputNode{ BaseNode: baseNode, diff --git a/internal/util/flowgraph/param_table.go b/internal/util/flowgraph/param_table.go new file mode 100644 index 0000000000..5378932d9b --- /dev/null +++ b/internal/util/flowgraph/param_table.go @@ -0,0 +1,45 @@ +package flowgraph + +import ( + "strconv" + + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable +} + +var Params ParamTable + +func (p *ParamTable) Init() { + p.BaseTable.Init() + err := p.LoadYaml("advanced/flow_graph.yaml") + if err != nil { + panic(err) + } +} + +func (p *ParamTable) FlowGraphMaxQueueLength() int32 { + queueLength, err := p.Load("flowGraph.maxQueueLength") + if err != nil { + panic(err) + } + length, err := strconv.Atoi(queueLength) + if err != nil { + panic(err) + } + return int32(length) +} + +func (p *ParamTable) FlowGraphMaxParallelism() int32 { + maxParallelism, err := p.Load("flowGraph.maxParallelism") + if err != nil { + panic(err) + } + maxPara, err := strconv.Atoi(maxParallelism) + if err != nil { + panic(err) + } + return int32(maxPara) +} diff --git a/internal/util/flowgraph/param_table_test.go b/internal/util/flowgraph/param_table_test.go new file mode 100644 index 0000000000..136c8854c9 --- /dev/null +++ b/internal/util/flowgraph/param_table_test.go @@ -0,0 +1,19 @@ +package flowgraph + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable_flowGraphMaxQueueLength(t *testing.T) { + Params.Init() + length := Params.FlowGraphMaxQueueLength() + assert.Equal(t, length, int32(1024)) +} + +func TestParamTable_flowGraphMaxParallelism(t *testing.T) { + Params.Init() + maxParallelism := Params.FlowGraphMaxParallelism() + assert.Equal(t, maxParallelism, int32(1024)) +} diff --git a/internal/util/flowgraph/type_def.go b/internal/util/flowgraph/type_def.go index d12c568024..3db30f0190 100644 --- a/internal/util/flowgraph/type_def.go +++ b/internal/util/flowgraph/type_def.go @@ -4,6 +4,3 @@ import "github.com/zilliztech/milvus-distributed/internal/util/typeutil" type Timestamp = typeutil.Timestamp type NodeName = string - -const MaxQueueLength = 1024 -const MaxParallelism = 1024 diff --git a/tools/core_gen/all_generate.py b/tools/core_gen/all_generate.py index 3ce44022f0..fec35bf2da 100755 --- a/tools/core_gen/all_generate.py +++ b/tools/core_gen/all_generate.py @@ -48,10 +48,16 @@ if __name__ == "__main__": node_names = ["Expr", "PlanNode"] visitor_info = { - 'Expr': [{ - 'visitor_name': "ShowExprVisitor", - "parameter_name": 'expr', - }], + 'Expr': [ + { + 'visitor_name': "ShowExprVisitor", + "parameter_name": 'expr', + }, + { + 'visitor_name': "ExecExprVisitor", + "parameter_name": 'expr', + }, + ], 'PlanNode': [ { 'visitor_name': "ShowPlanNodeVisitor", diff --git a/tools/core_gen/templates/visitor_derived.h b/tools/core_gen/templates/visitor_derived.h index 49d31fa44c..cda1ea1a74 100644 --- a/tools/core_gen/templates/visitor_derived.h +++ b/tools/core_gen/templates/visitor_derived.h @@ -2,7 +2,7 @@ @@root_base@@Visitor #### @@@@body@struct_name - virtual void + void visit(@@struct_name@@& @@parameter_name@@) override; #### @@@@main @@ -21,4 +21,4 @@ class @@visitor_name@@ : @@base_visitor@@ { }; } // namespace @@namespace@@ -#### \ No newline at end of file +####