diff --git a/internal/core/src/common/FieldMeta.h b/internal/core/src/common/FieldMeta.h index 6b91966fe7..a4159eca57 100644 --- a/internal/core/src/common/FieldMeta.h +++ b/internal/core/src/common/FieldMeta.h @@ -26,7 +26,7 @@ namespace milvus { -inline int +inline size_t datatype_sizeof(DataType data_type, int dim = 1) { switch (data_type) { case DataType::BOOL: @@ -103,6 +103,17 @@ datatype_is_string(DataType datatype) { } } +inline bool +datatype_is_variable(DataType datatype) { + switch (datatype) { + case DataType::VARCHAR: + case DataType::STRING: + return true; + default: + return false; + } +} + inline bool datatype_is_integer(DataType datatype) { switch (datatype) { @@ -200,7 +211,7 @@ class FieldMeta { return type_; } - int64_t + size_t get_sizeof() const { if (is_vector()) { return datatype_sizeof(type_, get_dim()); diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index 36a5c2a3d1..319eeeec30 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -18,8 +18,8 @@ #include #include -#include "Types.h" +#include "Types.h" #include "common/CDataType.h" // NOTE: field_id can be system field @@ -29,11 +29,8 @@ struct LoadFieldDataInfo { int64_t field_id; // const void* blob = nullptr; const milvus::DataArray* field_data; - int64_t row_count = -1; - - // ~LoadFieldDataInfo() { - // delete field_data; - // } + int64_t row_count{-1}; + const char* mmap_dir_path{nullptr}; }; struct LoadDeletedRecordInfo { diff --git a/internal/core/src/common/Span.h b/internal/core/src/common/Span.h index 27d246667d..a056fd9e9f 100644 --- a/internal/core/src/common/Span.h +++ b/internal/core/src/common/Span.h @@ -17,8 +17,8 @@ #pragma once #include -#include #include +#include #include "Types.h" #include "VectorTrait.h" diff --git a/internal/core/src/common/Utils.h b/internal/core/src/common/Utils.h index d476af109e..fd4693361e 100644 --- a/internal/core/src/common/Utils.h +++ b/internal/core/src/common/Utils.h @@ -11,11 +11,18 @@ #pragma once +#include +#include #include +#include +#include #include +#include #include "common/Consts.h" +#include "common/FieldMeta.h" +#include "common/LoadInfo.h" #include "config/ConfigChunkManager.h" #include "exceptions/EasyAssert.h" #include "knowhere/dataset.h" @@ -58,8 +65,8 @@ GetDatasetLims(const DatasetPtr& dataset) { } inline bool -PrefixMatch(const std::string& str, const std::string& prefix) { - auto ret = strncmp(str.c_str(), prefix.c_str(), prefix.length()); +PrefixMatch(const std::string_view str, const std::string_view prefix) { + auto ret = strncmp(str.data(), prefix.data(), prefix.length()); if (ret != 0) { return false; } @@ -79,13 +86,13 @@ GenResultDataset(const int64_t nq, const int64_t topk, const int64_t* ids, const } inline bool -PostfixMatch(const std::string& str, const std::string& postfix) { +PostfixMatch(const std::string_view str, const std::string& postfix) { if (postfix.length() > str.length()) { return false; } int offset = str.length() - postfix.length(); - auto ret = strncmp(str.c_str() + offset, postfix.c_str(), postfix.length()); + auto ret = strncmp(str.data() + offset, postfix.c_str(), postfix.length()); if (ret != 0) { return false; } @@ -166,4 +173,210 @@ MatchKnowhereError(knowhere::Status status) { } } +inline size_t +GetDataSize(const FieldMeta& field, size_t row_count, const DataArray* data) { + auto data_type = field.get_data_type(); + if (datatype_is_variable(data_type)) { + switch (data_type) { + case DataType::VARCHAR: + case DataType::STRING: { + auto begin = data->scalars().string_data().data().begin(); + auto end = data->scalars().string_data().data().end(); + + ssize_t size{0}; + while (begin != end) { + size += begin->size(); + begin++; + } + return size; + } + + default: + PanicInfo(fmt::format("not supported data type {}", datatype_name(data_type))); + } + } + + return field.get_sizeof() * row_count; +} + +inline void* +FillField(DataType data_type, size_t size, const LoadFieldDataInfo& info, void* dst) { + auto data = info.field_data; + switch (data_type) { + case DataType::BOOL: { + return memcpy(dst, data->scalars().bool_data().data().data(), size); + } + case DataType::INT8: { + auto src_data = data->scalars().int_data().data(); + std::vector data_raw(src_data.size()); + std::copy_n(src_data.data(), src_data.size(), data_raw.data()); + return memcpy(dst, data_raw.data(), size); + } + case DataType::INT16: { + auto src_data = data->scalars().int_data().data(); + std::vector data_raw(src_data.size()); + std::copy_n(src_data.data(), src_data.size(), data_raw.data()); + return memcpy(dst, data_raw.data(), size); + } + case DataType::INT32: { + return memcpy(dst, data->scalars().int_data().data().data(), size); + } + case DataType::INT64: { + return memcpy(dst, data->scalars().long_data().data().data(), size); + } + case DataType::FLOAT: { + return memcpy(dst, data->scalars().float_data().data().data(), size); + } + case DataType::DOUBLE: { + return memcpy(dst, data->scalars().double_data().data().data(), size); + } + case DataType::VARCHAR: { + char* dest = reinterpret_cast(dst); + auto begin = data->scalars().string_data().data().begin(); + auto end = data->scalars().string_data().data().end(); + + while (begin != end) { + memcpy(dest, begin->data(), begin->size()); + dest += begin->size(); + begin++; + } + return dst; + } + case DataType::VECTOR_FLOAT: + return memcpy(dst, data->vectors().float_vector().data().data(), size); + + case DataType::VECTOR_BINARY: + return memcpy(dst, data->vectors().binary_vector().data(), size); + + default: { + PanicInfo("unsupported"); + } + } +} + +inline ssize_t +WriteFieldData(int fd, DataType data_type, const DataArray* data, size_t size) { + switch (data_type) { + case DataType::BOOL: { + return write(fd, data->scalars().bool_data().data().data(), size); + } + case DataType::INT8: { + auto src_data = data->scalars().int_data().data(); + std::vector data_raw(src_data.size()); + std::copy_n(src_data.data(), src_data.size(), data_raw.data()); + return write(fd, data_raw.data(), size); + } + case DataType::INT16: { + auto src_data = data->scalars().int_data().data(); + std::vector data_raw(src_data.size()); + std::copy_n(src_data.data(), src_data.size(), data_raw.data()); + return write(fd, data_raw.data(), size); + } + case DataType::INT32: { + return write(fd, data->scalars().int_data().data().data(), size); + } + case DataType::INT64: { + return write(fd, data->scalars().long_data().data().data(), size); + } + case DataType::FLOAT: { + return write(fd, data->scalars().float_data().data().data(), size); + } + case DataType::DOUBLE: { + return write(fd, data->scalars().double_data().data().data(), size); + } + case DataType::VARCHAR: { + auto begin = data->scalars().string_data().data().begin(); + auto end = data->scalars().string_data().data().end(); + + ssize_t total_written{0}; + while (begin != end) { + ssize_t written = write(fd, begin->data(), begin->size()); + if (written < begin->size()) { + break; + } + total_written += written; + begin++; + } + return total_written; + } + case DataType::VECTOR_FLOAT: + return write(fd, data->vectors().float_vector().data().data(), size); + + case DataType::VECTOR_BINARY: + return write(fd, data->vectors().binary_vector().data(), size); + + default: { + PanicInfo("unsupported"); + } + } +} + +// CreateMap creates a memory mapping, +// if mmap enabled, this writes field data to disk and create a map to the file, +// otherwise this just alloc memory +inline void* +CreateMap(int64_t segment_id, const FieldMeta& field_meta, const LoadFieldDataInfo& info) { + static int mmap_flags = MAP_PRIVATE; +#ifdef MAP_POPULATE + // macOS doesn't support MAP_POPULATE + mmap_flags |= MAP_POPULATE; +#endif + // Allocate memory + if (info.mmap_dir_path == nullptr) { + auto data_type = field_meta.get_data_type(); + auto data_size = GetDataSize(field_meta, info.row_count, info.field_data); + if (data_size == 0) + return nullptr; + + // Use anon mapping so we are able to free these memory with munmap only + void* map = mmap(NULL, data_size, PROT_READ | PROT_WRITE, mmap_flags | MAP_ANON, -1, 0); + AssertInfo(map != MAP_FAILED, fmt::format("failed to create anon map, err: {}", strerror(errno))); + FillField(data_type, data_size, info, map); + return map; + } + + auto filepath = + std::filesystem::path(info.mmap_dir_path) / std::to_string(segment_id) / std::to_string(info.field_id); + auto dir = filepath.parent_path(); + std::filesystem::create_directories(dir); + + int fd = open(filepath.c_str(), O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR); + AssertInfo(fd != -1, fmt::format("failed to create mmap file {}", filepath.c_str())); + + auto data_type = field_meta.get_data_type(); + size_t size = field_meta.get_sizeof() * info.row_count; + auto written = WriteFieldData(fd, data_type, info.field_data, size); + AssertInfo(written == size || written != -1 && datatype_is_variable(field_meta.get_data_type()), + fmt::format("failed to write data file {}, written {} but total {}, err: {}", filepath.c_str(), written, + size, strerror(errno))); + int ok = fsync(fd); + AssertInfo(ok == 0, fmt::format("failed to fsync mmap data file {}, err: {}", filepath.c_str(), strerror(errno))); + + // Empty field + if (written == 0) { + return nullptr; + } + + auto map = mmap(NULL, written, PROT_READ, mmap_flags, fd, 0); + AssertInfo(map != MAP_FAILED, + fmt::format("failed to create map for data file {}, err: {}", filepath.c_str(), strerror(errno))); + +#ifndef MAP_POPULATE + // Manually access the mapping to populate it + const size_t PAGE_SIZE = 4 << 10; // 4KiB + char* begin = (char*)map; + char* end = begin + written; + for (char* page = begin; page < end; page += PAGE_SIZE) { + char value = page[0]; + } +#endif + // unlink this data file so + // then it will be auto removed after we don't need it again + ok = unlink(filepath.c_str()); + AssertInfo(ok == 0, fmt::format("failed to unlink mmap data file {}, err: {}", filepath.c_str(), strerror(errno))); + ok = close(fd); + AssertInfo(ok == 0, fmt::format("failed to close data file {}, err: {}", filepath.c_str(), strerror(errno))); + return map; +} + } // namespace milvus diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index 301fc9ba5e..066df2d2e1 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -49,7 +49,8 @@ template constexpr bool IsVector = std::is_base_of_v; template -constexpr bool IsScalar = std::is_fundamental_v || std::is_same_v; +constexpr bool IsScalar = + std::is_fundamental_v || std::is_same_v || std::is_same_v; template struct EmbeddedTypeImpl; diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 8946ab2c45..d079676223 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -74,6 +74,9 @@ typedef struct CLoadFieldDataInfo { const uint8_t* blob; uint64_t blob_size; int64_t row_count; + // Set null to disable mmap, + // mmap file path will be {mmap_dir_path}/{segment_id}/{field_id} + const char* mmap_dir_path; } CLoadFieldDataInfo; typedef struct CLoadDeletedRecordInfo { diff --git a/internal/core/src/index/ScalarIndex.h b/internal/core/src/index/ScalarIndex.h index 877151df88..a7cc52373b 100644 --- a/internal/core/src/index/ScalarIndex.h +++ b/internal/core/src/index/ScalarIndex.h @@ -16,13 +16,14 @@ #pragma once +#include #include #include #include -#include -#include "index/Index.h" + #include "common/Types.h" #include "exceptions/EasyAssert.h" +#include "index/Index.h" namespace milvus::index { diff --git a/internal/core/src/index/StringIndex.h b/internal/core/src/index/StringIndex.h index f92b9c9fca..266832e018 100644 --- a/internal/core/src/index/StringIndex.h +++ b/internal/core/src/index/StringIndex.h @@ -16,13 +16,15 @@ #pragma once -#include "index/ScalarIndex.h" -#include -#include -#include -#include "index/Meta.h" #include +#include +#include +#include + +#include "index/Meta.h" +#include "index/ScalarIndex.h" + namespace milvus::index { class StringIndex : public ScalarIndex { diff --git a/internal/core/src/index/StringIndexSort.h b/internal/core/src/index/StringIndexSort.h index ccf2b44ad9..a0e05d238d 100644 --- a/internal/core/src/index/StringIndexSort.h +++ b/internal/core/src/index/StringIndexSort.h @@ -37,7 +37,7 @@ class StringIndexSort : public ScalarIndexSort { } const TargetBitmapPtr - PrefixMatch(std::string prefix) { + PrefixMatch(std::string_view prefix) { auto data = GetData(); TargetBitmapPtr bitset = std::make_unique(data.size()); for (size_t i = 0; i < data.size(); i++) { diff --git a/internal/core/src/query/PlanProto.cpp b/internal/core/src/query/PlanProto.cpp index b30a33600d..68135f8e36 100644 --- a/internal/core/src/query/PlanProto.cpp +++ b/internal/core/src/query/PlanProto.cpp @@ -9,15 +9,16 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include "PlanProto.h" + #include #include #include "ExprImpl.h" -#include "PlanProto.h" +#include "common/VectorTrait.h" #include "generated/ExtractInfoExprVisitor.h" #include "generated/ExtractInfoPlanNodeVisitor.h" -#include "common/VectorTrait.h" namespace milvus::query { namespace planpb = milvus::proto::plan; diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index 9cd17a136c..26a42986f1 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -71,7 +71,7 @@ SearchOnSealedIndex(const Schema& schema, void SearchOnSealed(const Schema& schema, - const segcore::InsertRecord& record, + const void* vec_data, const SearchInfo& search_info, const void* query_data, int64_t num_queries, @@ -83,11 +83,9 @@ SearchOnSealed(const Schema& schema, query::dataset::SearchDataset dataset{search_info.metric_type_, num_queries, search_info.topk_, search_info.round_decimal_, field.get_dim(), query_data}; - auto vec_data = record.get_field_data_base(field_id); - AssertInfo(vec_data->num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); - auto chunk_data = vec_data->get_chunk_data(0); + CheckBruteForceSearchParam(field, search_info); - auto sub_qr = BruteForceSearch(dataset, chunk_data, row_count, search_info.search_params_, bitset); + auto sub_qr = BruteForceSearch(dataset, vec_data, row_count, search_info.search_params_, bitset); result.distances_ = std::move(sub_qr.mutable_distances()); result.seg_offsets_ = std::move(sub_qr.mutable_seg_offsets()); diff --git a/internal/core/src/query/SearchOnSealed.h b/internal/core/src/query/SearchOnSealed.h index 044d3d39b7..8a794632d5 100644 --- a/internal/core/src/query/SearchOnSealed.h +++ b/internal/core/src/query/SearchOnSealed.h @@ -29,7 +29,7 @@ SearchOnSealedIndex(const Schema& schema, void SearchOnSealed(const Schema& schema, - const segcore::InsertRecord& record, + const void* vec_data, const SearchInfo& search_info, const void* query_data, int64_t num_queries, diff --git a/internal/core/src/query/Utils.h b/internal/core/src/query/Utils.h index 757bb92e72..6435e2b4e6 100644 --- a/internal/core/src/query/Utils.h +++ b/internal/core/src/query/Utils.h @@ -35,4 +35,17 @@ Match(const std::string& str, const std::string& val, OpType op) { PanicInfo("not supported"); } } + +template <> +inline bool +Match(const std::string_view& str, const std::string& val, OpType op) { + switch (op) { + case OpType::PrefixMatch: + return PrefixMatch(str, val); + case OpType::PostfixMatch: + return PostfixMatch(str, val); + default: + PanicInfo("not supported"); + } +} } // namespace milvus::query diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 06b551d0b8..c40e6d2088 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -9,17 +9,18 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include "query/generated/ExecExprVisitor.h" + +#include #include #include #include #include -#include #include "query/ExprImpl.h" -#include "query/generated/ExecExprVisitor.h" -#include "segcore/SegmentGrowingImpl.h" -#include "query/Utils.h" #include "query/Relational.h" +#include "query/Utils.h" +#include "segcore/SegmentGrowingImpl.h" namespace milvus::query { // THIS CONTAINS EXTRA BODY FOR VISITOR @@ -158,9 +159,10 @@ ExecExprVisitor::ExecRangeVisitorImpl(FieldId field_id, IndexFunc index_func, El auto num_chunk = upper_div(row_count_, size_per_chunk); std::deque results; - using Index = index::ScalarIndex; + typedef std::conditional_t, std::string, T> IndexInnerType; + using Index = index::ScalarIndex; for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) { - const Index& indexing = segment_.chunk_scalar_index(field_id, chunk_id); + const Index& indexing = segment_.chunk_scalar_index(field_id, chunk_id); // NOTE: knowhere is not const-ready // This is a dirty workaround auto data = index_func(const_cast(&indexing)); @@ -173,7 +175,8 @@ ExecExprVisitor::ExecRangeVisitorImpl(FieldId field_id, IndexFunc index_func, El auto chunk = segment_.chunk_data(field_id, chunk_id); const T* data = chunk.data(); for (int index = 0; index < this_size; ++index) { - result[index] = element_func(data[index]); + auto x = data[index]; + result[index] = element_func(x); } AssertInfo(result.size() == this_size, ""); results.emplace_back(std::move(result)); @@ -215,9 +218,10 @@ ExecExprVisitor::ExecDataRangeVisitorImpl(FieldId field_id, IndexFunc index_func // if sealed segment has loaded scalar index for this field, then index_barrier = 1 and data_barrier = 0 // in this case, sealed segment execute expr plan using scalar index - using Index = index::ScalarIndex; + typedef std::conditional_t, std::string, T> IndexInnerType; + using Index = index::ScalarIndex; for (auto chunk_id = data_barrier; chunk_id < indexing_barrier; ++chunk_id) { - auto& indexing = segment_.chunk_scalar_index(field_id, chunk_id); + auto& indexing = segment_.chunk_scalar_index(field_id, chunk_id); auto this_size = const_cast(&indexing)->Count(); BitsetType result(this_size); for (int offset = 0; offset < this_size; ++offset) { @@ -236,10 +240,12 @@ ExecExprVisitor::ExecDataRangeVisitorImpl(FieldId field_id, IndexFunc index_func template auto ExecExprVisitor::ExecUnaryRangeVisitorDispatcher(UnaryRangeExpr& expr_raw) -> BitsetType { - auto& expr = static_cast&>(expr_raw); - using Index = index::ScalarIndex; + typedef std::conditional_t, std::string, T> IndexInnerType; + using Index = index::ScalarIndex; + auto& expr = static_cast&>(expr_raw); + auto op = expr.op_type_; - auto val = expr.value_; + auto val = IndexInnerType(expr.value_); switch (op) { case OpType::Equal: { auto index_func = [val](Index* index) { return index->In(1, &val); }; @@ -412,12 +418,14 @@ ExecExprVisitor::ExecBinaryArithOpEvalRangeVisitorDispatcher(BinaryArithOpEvalRa template auto ExecExprVisitor::ExecBinaryRangeVisitorDispatcher(BinaryRangeExpr& expr_raw) -> BitsetType { - auto& expr = static_cast&>(expr_raw); - using Index = index::ScalarIndex; + typedef std::conditional_t, std::string, T> IndexInnerType; + using Index = index::ScalarIndex; + auto& expr = static_cast&>(expr_raw); + bool lower_inclusive = expr.lower_inclusive_; bool upper_inclusive = expr.upper_inclusive_; - T val1 = expr.lower_value_; - T val2 = expr.upper_value_; + IndexInnerType val1 = IndexInnerType(expr.lower_value_); + IndexInnerType val2 = IndexInnerType(expr.upper_value_); auto index_func = [=](Index* index) { return index->Range(val1, lower_inclusive, val2, upper_inclusive); }; if (lower_inclusive && upper_inclusive) { @@ -472,7 +480,11 @@ ExecExprVisitor::visit(UnaryRangeExpr& expr) { break; } case DataType::VARCHAR: { - res = ExecUnaryRangeVisitorDispatcher(expr); + if (segment_.type() == SegmentType::Growing) { + res = ExecUnaryRangeVisitorDispatcher(expr); + } else { + res = ExecUnaryRangeVisitorDispatcher(expr); + } break; } default: @@ -556,7 +568,11 @@ ExecExprVisitor::visit(BinaryRangeExpr& expr) { break; } case DataType::VARCHAR: { - res = ExecBinaryRangeVisitorDispatcher(expr); + if (segment_.type() == SegmentType::Growing) { + res = ExecBinaryRangeVisitorDispatcher(expr); + } else { + res = ExecBinaryRangeVisitorDispatcher(expr); + } break; } default: @@ -676,8 +692,13 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) -> BitsetTy } case DataType::VARCHAR: { if (chunk_id < data_barrier) { - auto chunk_data = segment_.chunk_data(field_id, chunk_id).data(); - return [chunk_data](int i) -> const number { return chunk_data[i]; }; + if (segment_.type() == SegmentType::Growing) { + auto chunk_data = segment_.chunk_data(field_id, chunk_id).data(); + return [chunk_data](int i) -> const number { return chunk_data[i]; }; + } else { + auto chunk_data = segment_.chunk_data(field_id, chunk_id).data(); + return [chunk_data](int i) -> const number { return std::string(chunk_data[i]); }; + } } else { // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index(field_id, chunk_id); @@ -808,12 +829,19 @@ ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> BitsetT return ExecTermVisitorImplTemplate(expr_raw); } +template <> +auto +ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> BitsetType { + return ExecTermVisitorImplTemplate(expr_raw); +} + template auto ExecExprVisitor::ExecTermVisitorImplTemplate(TermExpr& expr_raw) -> BitsetType { - auto& expr = static_cast&>(expr_raw); - using Index = index::ScalarIndex; - const auto& terms = expr.terms_; + typedef std::conditional_t, std::string, T> IndexInnerType; + using Index = index::ScalarIndex; + auto& expr = static_cast&>(expr_raw); + const std::vector terms(expr.terms_.begin(), expr.terms_.end()); auto n = terms.size(); std::unordered_set term_set(expr.terms_.begin(), expr.terms_.end()); @@ -894,7 +922,11 @@ ExecExprVisitor::visit(TermExpr& expr) { break; } case DataType::VARCHAR: { - res = ExecTermVisitorImpl(expr); + if (segment_.type() == SegmentType::Growing) { + res = ExecTermVisitorImpl(expr); + } else { + res = ExecTermVisitorImpl(expr); + } break; } default: diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index 0ca2fb3f42..b083dc9bda 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -9,12 +9,13 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include "query/generated/ExecPlanNodeVisitor.h" + #include #include "query/PlanImpl.h" -#include "query/generated/ExecPlanNodeVisitor.h" -#include "query/generated/ExecExprVisitor.h" #include "query/SubSearchResult.h" +#include "query/generated/ExecExprVisitor.h" #include "segcore/SegmentGrowing.h" #include "utils/Json.h" diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 93071a9a4a..acfd47b6db 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -11,18 +11,19 @@ #pragma once -#include -#include -#include #include +#include +#include #include #include +#include +#include "TimestampIndex.h" #include "common/Schema.h" +#include "easylogging++.h" #include "segcore/AckResponder.h" #include "segcore/ConcurrentVector.h" #include "segcore/Record.h" -#include "TimestampIndex.h" namespace milvus::segcore { @@ -310,8 +311,8 @@ struct InsertRecord { private: // std::vector> fields_data_; - std::unordered_map> fields_data_; - mutable std::shared_mutex shared_mutex_; + std::unordered_map> fields_data_{}; + mutable std::shared_mutex shared_mutex_{}; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index d776bee184..12d90388f1 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -38,6 +38,11 @@ class SegmentGrowing : public SegmentInternalInterface { const Timestamp* timestamps, const InsertData* insert_data) = 0; + virtual SegmentType + type() const override { + return SegmentType::Growing; + } + // virtual int64_t // PreDelete(int64_t size) = 0; diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index 9cc4a2c4bc..b396e91e83 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -10,11 +10,13 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "SegmentInterface.h" + #include + +#include "Utils.h" #include "common/SystemProperty.h" #include "common/Types.h" #include "query/generated/ExecPlanNodeVisitor.h" -#include "Utils.h" namespace milvus::segcore { @@ -72,7 +74,6 @@ SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan, Timestamp ti query::ExecPlanNodeVisitor visitor(*this, timestamp); auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_); retrieve_results.segment_ = (void*)this; - results->mutable_offset()->Add(retrieve_results.result_offsets_.begin(), retrieve_results.result_offsets_.end()); auto fields_data = results->mutable_fields_data(); diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 98c1d87960..735320c2c2 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -80,6 +80,9 @@ class SegmentInterface { virtual int64_t get_segment_id() const = 0; + + virtual SegmentType + type() const = 0; }; // internal API for DSL calculation diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 707dc4cf8f..874a7db879 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -33,6 +33,11 @@ class SegmentSealed : public SegmentInternalInterface { DropIndex(const FieldId field_id) = 0; virtual void DropFieldData(const FieldId field_id) = 0; + + virtual SegmentType + type() const override { + return SegmentType::Sealed; + } }; using SegmentSealedPtr = std::unique_ptr; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 076d8d30dd..6e8b3b75ed 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -11,6 +11,11 @@ #include "SegmentSealedImpl.h" +#include +#include + +#include + #include "Utils.h" #include "common/Consts.h" #include "common/FieldMeta.h" @@ -195,20 +200,28 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { // Don't allow raw data and index exist at the same time AssertInfo(!get_bit(index_ready_bitset_, field_id), "field data can't be loaded when indexing exists"); - auto field_data = insert_record_.get_field_data_base(field_id); - AssertInfo(field_data->empty(), "already exists"); - // insert data to insertRecord - field_data->fill_chunk_data(size, info.field_data, field_meta); - AssertInfo(field_data->num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); + void* field_data = nullptr; + size_t size = 0; + if (datatype_is_variable(data_type)) { + VariableField field(get_segment_id(), field_meta, info); + size = field.size(); + field_data = reinterpret_cast(field.data()); + variable_fields_.emplace(field_id, std::move(field)); + } else { + field_data = CreateMap(get_segment_id(), field_meta, info); + fixed_fields_[field_id] = field_data; + size = field_meta.get_sizeof() * info.row_count; + } // set pks to offset if (schema_->get_primary_field_id() == field_id) { AssertInfo(field_id.get() != -1, "Primary key is -1"); AssertInfo(insert_record_.empty_pks(), "already exists"); - std::vector pks(size); + std::vector pks(info.row_count); ParsePksFromFieldData(pks, *info.field_data); - for (int i = 0; i < size; ++i) { + + for (int i = 0; i < info.row_count; ++i) { insert_record_.insert_pk(pks[i], i); } insert_record_.seal_pks(); @@ -216,6 +229,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { set_bit(field_data_ready_bitset_, field_id, true); } + std::unique_lock lck(mutex_); update_row_count(info.row_count); } @@ -253,9 +267,7 @@ SegmentSealedImpl::num_chunk_index(FieldId field_id) const { int64_t SegmentSealedImpl::num_chunk_data(FieldId field_id) const { - auto field_data = insert_record_.get_field_data_base(field_id); - AssertInfo(field_data != nullptr, "null field data ptr"); - return field_data->num_chunk(); + return get_bit(field_data_ready_bitset_, field_id) ? 1 : 0; } int64_t @@ -275,6 +287,14 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { "Can't get bitset element at " + std::to_string(field_id.get())); auto& field_meta = schema_->operator[](field_id); auto element_sizeof = field_meta.get_sizeof(); + if (auto it = fixed_fields_.find(field_id); it != fixed_fields_.end()) { + auto field_data = it->second; + return SpanBase(field_data, get_row_count(), element_sizeof); + } + if (auto it = variable_fields_.find(field_id); it != variable_fields_.end()) { + auto& field = it->second; + return SpanBase(field.views().data(), field.views().size(), sizeof(std::string_view)); + } auto field_data = insert_record_.get_field_data_base(field_id); AssertInfo(field_data->num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); return field_data->get_span_base(0); @@ -349,8 +369,8 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info, "Field Data is not loaded: " + std::to_string(field_id.get())); AssertInfo(row_count_opt_.has_value(), "Can't get row count value"); auto row_count = row_count_opt_.value(); - query::SearchOnSealed(*schema_, insert_record_, search_info, query_data, query_count, row_count, bitset, - output); + auto vec_data = fixed_fields_.at(field_id); + query::SearchOnSealed(*schema_, vec_data, search_info, query_data, query_count, row_count, bitset, output); } } @@ -458,6 +478,22 @@ SegmentSealedImpl::bulk_subscript_impl(const void* src_raw, const int64_t* seg_o } } +template +void +SegmentSealedImpl::bulk_subscript_impl(const VariableField& field, + const int64_t* seg_offsets, + int64_t count, + void* dst_raw) { + auto dst = reinterpret_cast(dst_raw); + for (int64_t i = 0; i < count; ++i) { + auto offset = seg_offsets[i]; + if (offset != INVALID_SEG_OFFSET) { + auto entry = field[offset]; + dst[i] = std::move(T(entry.data(), entry.row_count())); + } + } +} + // for vector void SegmentSealedImpl::bulk_subscript_impl( @@ -506,10 +542,22 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, const int64_t* seg_offsets, } Assert(get_bit(field_data_ready_bitset_, field_id)); - auto field_data = insert_record_.get_field_data_base(field_id); - AssertInfo(field_data->num_chunk() == 1, std::string("num chunk not equal to 1 for sealed segment, num_chunk: ") + - std::to_string(field_data->num_chunk())); - auto src_vec = field_data->get_chunk_data(0); + + if (datatype_is_variable(field_meta.get_data_type())) { + switch (field_meta.get_data_type()) { + case DataType::VARCHAR: + case DataType::STRING: { + FixedVector output(count); + bulk_subscript_impl(variable_fields_.at(field_id), seg_offsets, count, output.data()); + return CreateScalarDataArrayFrom(output.data(), count, field_meta); + } + + default: + PanicInfo(fmt::format("unsupported data type: {}", datatype_name(field_meta.get_data_type()))); + } + } + + auto src_vec = fixed_fields_.at(field_id); switch (field_meta.get_data_type()) { case DataType::BOOL: { FixedVector output(count); @@ -546,11 +594,6 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, const int64_t* seg_offsets, bulk_subscript_impl(src_vec, seg_offsets, count, output.data()); return CreateScalarDataArrayFrom(output.data(), count, field_meta); } - case DataType::VARCHAR: { - FixedVector output(count); - bulk_subscript_impl(src_vec, seg_offsets, count, output.data()); - return CreateScalarDataArrayFrom(output.data(), count, field_meta); - } case DataType::VECTOR_FLOAT: case DataType::VECTOR_BINARY: { diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 19a0e80950..8e51c70604 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -11,15 +11,16 @@ #pragma once +#include +#include + #include -#include #include #include #include +#include #include #include -#include -#include #include "ConcurrentVector.h" #include "DeletedRecord.h" @@ -27,13 +28,24 @@ #include "SealedIndexingRecord.h" #include "SegmentSealed.h" #include "TimestampIndex.h" +#include "VariableField.h" #include "index/ScalarIndex.h" +#include "sys/mman.h" namespace milvus::segcore { class SegmentSealedImpl : public SegmentSealed { public: explicit SegmentSealedImpl(SchemaPtr schema, int64_t segment_id); + ~SegmentSealedImpl() { + for (auto& [field_id, data] : fixed_fields_) { + auto field_meta = schema_->operator[](field_id); + auto data_type = field_meta.get_data_type(); + if (munmap(data, field_meta.get_sizeof() * get_row_count())) { + AssertInfo(true, "failed to unmap field " + std::to_string(field_id.get()) + " err=" + strerror(errno)); + } + } + } void LoadIndex(const LoadIndexInfo& info) override; void @@ -122,6 +134,10 @@ class SegmentSealedImpl : public SegmentSealed { static void bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw); + template + static void + bulk_subscript_impl(const VariableField& field, const int64_t* seg_offsets, int64_t count, void* dst_raw); + static void bulk_subscript_impl( int64_t element_sizeof, const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw); @@ -131,11 +147,11 @@ class SegmentSealedImpl : public SegmentSealed { void update_row_count(int64_t row_count) { - if (row_count_opt_.has_value()) { - AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns"); - } else { - row_count_opt_ = row_count; - } + // if (row_count_opt_.has_value()) { + // AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns"); + // } else { + row_count_opt_ = row_count; + // } } void @@ -200,6 +216,8 @@ class SegmentSealedImpl : public SegmentSealed { SchemaPtr schema_; int64_t id_; + std::unordered_map fixed_fields_; + std::unordered_map variable_fields_; }; inline SegmentSealedPtr diff --git a/internal/core/src/segcore/VariableField.h b/internal/core/src/segcore/VariableField.h new file mode 100644 index 0000000000..19c7b1969e --- /dev/null +++ b/internal/core/src/segcore/VariableField.h @@ -0,0 +1,89 @@ +#pragma once + +#include + +#include +#include + +#include "common/LoadInfo.h" + +namespace milvus::segcore { + +struct Entry { + char* data; + uint32_t length; +}; + +// Used for string/varchar field only, +// TODO(yah01): make this generic +class VariableField { + public: + explicit VariableField(int64_t segment_id, const FieldMeta& field_meta, const LoadFieldDataInfo& info) { + auto begin = info.field_data->scalars().string_data().data().begin(); + auto end = info.field_data->scalars().string_data().data().end(); + + indices_.reserve(info.row_count); + while (begin != end) { + indices_.push_back(size_); + size_ += begin->size(); + begin++; + } + + data_ = (char*)CreateMap(segment_id, field_meta, info); + construct_views(); + } + + VariableField(VariableField&& field) + : indices_(std::move(field.indices_)), size_(field.size_), data_(field.data_), views_(std::move(field.views_)) { + field.data_ = nullptr; + } + + ~VariableField() { + if (data_ != MAP_FAILED && data_ != nullptr) { + if (munmap(data_, size_)) { + AssertInfo(true, std::string("failed to unmap variable field err=") + strerror(errno)); + } + } + } + + char* + data() { + return data_; + } + + const std::vector& + views() const { + return views_; + } + + size_t + size() const { + return size_; + } + + Span + operator[](const int i) const { + uint64_t next = (i + 1 == indices_.size()) ? size_ : indices_[i + 1]; + uint64_t offset = indices_[i]; + return Span(data_ + offset, uint32_t(next - offset)); + } + + protected: + void + construct_views() { + views_.reserve(indices_.size()); + for (size_t i = 0; i < indices_.size() - 1; i++) { + views_.emplace_back(data_ + indices_[i], indices_[i + 1] - indices_[i]); + } + views_.emplace_back(data_ + indices_.back(), size_ - indices_.back()); + } + + private: + std::vector indices_{}; + uint64_t size_{0}; + char* data_{nullptr}; + + // Compatible with current Span type + std::vector views_{}; +}; +} // namespace milvus::segcore diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index aae9bd913a..a820fbce8c 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -9,18 +9,18 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include "segcore/segment_c.h" + #include "common/CGoHelper.h" #include "common/LoadInfo.h" #include "common/Types.h" #include "common/type_c.h" +#include "google/protobuf/text_format.h" +#include "index/IndexInfo.h" #include "log/Log.h" - #include "segcore/Collection.h" #include "segcore/SegmentGrowingImpl.h" #include "segcore/SegmentSealedImpl.h" -#include "segcore/segment_c.h" -#include "index/IndexInfo.h" -#include "google/protobuf/text_format.h" ////////////////////////////// common interfaces ////////////////////////////// CSegmentInterface @@ -206,8 +206,8 @@ LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_in auto field_data = std::make_unique(); auto suc = field_data->ParseFromArray(load_field_data_info.blob, load_field_data_info.blob_size); AssertInfo(suc, "unmarshal field data string failed"); - auto load_info = - LoadFieldDataInfo{load_field_data_info.field_id, field_data.get(), load_field_data_info.row_count}; + auto load_info = LoadFieldDataInfo{load_field_data_info.field_id, field_data.get(), + load_field_data_info.row_count, load_field_data_info.mmap_dir_path}; segment->LoadFieldData(load_info); return milvus::SuccessCStatus(); } catch (std::exception& e) { diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 1873af6777..a957470421 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -9,27 +9,27 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#include -#include #include #include + +#include +#include #include #include #include #include -#include "knowhere/comp/index_param.h" - #include "common/LoadInfo.h" +#include "index/IndexFactory.h" +#include "knowhere/comp/index_param.h" #include "pb/plan.pb.h" #include "query/ExprImpl.h" #include "segcore/Collection.h" -#include "segcore/reduce_c.h" #include "segcore/Reduce.h" +#include "segcore/reduce_c.h" #include "test_utils/DataGen.h" -#include "index/IndexFactory.h" -#include "test_utils/indexbuilder_test_utils.h" #include "test_utils/PbHelper.h" +#include "test_utils/indexbuilder_test_utils.h" namespace chrono = std::chrono; diff --git a/internal/core/unittest/test_common.cpp b/internal/core/unittest/test_common.cpp index 71d8efc88e..73bb0a5ecf 100644 --- a/internal/core/unittest/test_common.cpp +++ b/internal/core/unittest/test_common.cpp @@ -13,17 +13,18 @@ #include #include "common/Types.h" #include "common/Span.h" +#include "common/VectorTrait.h" TEST(Common, Span) { using namespace milvus; using namespace milvus::segcore; Span s1(nullptr, 100); - Span s2(nullptr, 10, 16 * sizeof(float)); + Span s2(nullptr, 10, 16 * sizeof(float)); SpanBase b1 = s1; SpanBase b2 = s2; auto r1 = static_cast>(b1); - auto r2 = static_cast>(b2); + auto r2 = static_cast>(b2); ASSERT_EQ(r1.row_count(), 100); ASSERT_EQ(r2.row_count(), 10); ASSERT_EQ(r2.element_sizeof(), 16 * sizeof(float)); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 0406cb27e4..73751fdf51 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -352,6 +352,9 @@ TEST(Sealed, LoadFieldData) { auto double_id = schema->AddDebugField("double", DataType::DOUBLE); auto nothing_id = schema->AddDebugField("nothing", DataType::INT32); auto str_id = schema->AddDebugField("str", DataType::VARCHAR); + schema->AddDebugField("int8", DataType::INT8); + schema->AddDebugField("int16", DataType::INT16); + schema->AddDebugField("float", DataType::FLOAT); schema->set_primary_field_id(counter_id); auto dataset = DataGen(schema, N); @@ -398,7 +401,6 @@ TEST(Sealed, LoadFieldData) { ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time)); SealedLoadFieldData(dataset, *segment); - segment->DropFieldData(nothing_id); segment->Search(plan.get(), ph_group.get(), time); segment->DropFieldData(fakevec_id); @@ -415,7 +417,101 @@ TEST(Sealed, LoadFieldData) { ASSERT_EQ(segment->num_chunk_index(str_id), 0); auto chunk_span1 = segment->chunk_data(counter_id, 0); auto chunk_span2 = segment->chunk_data(double_id, 0); - auto chunk_span3 = segment->chunk_data(str_id, 0); + auto chunk_span3 = segment->chunk_data(str_id, 0); + auto ref1 = dataset.get_col(counter_id); + auto ref2 = dataset.get_col(double_id); + auto ref3 = dataset.get_col(str_id)->scalars().string_data().data(); + for (int i = 0; i < N; ++i) { + ASSERT_EQ(chunk_span1[i], ref1[i]); + ASSERT_EQ(chunk_span2[i], ref2[i]); + ASSERT_EQ(chunk_span3[i], ref3[i]); + } + + auto sr = segment->Search(plan.get(), ph_group.get(), time); + auto json = SearchResultToJson(*sr); + std::cout << json.dump(1); + + segment->DropIndex(fakevec_id); + ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time)); +} + +TEST(Sealed, LoadFieldDataMmap) { + auto dim = 16; + auto topK = 5; + auto N = ROW_COUNT; + auto metric_type = knowhere::metric::L2; + auto schema = std::make_shared(); + auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); + auto counter_id = schema->AddDebugField("counter", DataType::INT64); + auto double_id = schema->AddDebugField("double", DataType::DOUBLE); + auto nothing_id = schema->AddDebugField("nothing", DataType::INT32); + auto str_id = schema->AddDebugField("str", DataType::VARCHAR); + schema->AddDebugField("int8", DataType::INT8); + schema->AddDebugField("int16", DataType::INT16); + schema->AddDebugField("float", DataType::FLOAT); + schema->set_primary_field_id(counter_id); + + auto dataset = DataGen(schema, N); + + auto fakevec = dataset.get_col(fakevec_id); + + auto indexing = GenVecIndexing(N, dim, fakevec.data()); + + auto segment = CreateSealedSegment(schema); + std::string dsl = R"({ + "bool": { + "must": [ + { + "range": { + "double": { + "GE": -1, + "LT": 1 + } + } + }, + { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 5, + "round_decimal": 3 + } + } + } + ] + } + })"; + + Timestamp time = 1000000; + auto plan = CreatePlan(*schema, dsl); + auto num_queries = 5; + auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024); + auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + + ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time)); + + SealedLoadFieldData(dataset, *segment, {}, true); + segment->Search(plan.get(), ph_group.get(), time); + + segment->DropFieldData(fakevec_id); + ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time)); + + LoadIndexInfo vec_info; + vec_info.field_id = fakevec_id.get(); + vec_info.index = std::move(indexing); + vec_info.index_params["metric_type"] = knowhere::metric::L2; + segment->LoadIndex(vec_info); + + ASSERT_EQ(segment->num_chunk(), 1); + ASSERT_EQ(segment->num_chunk_index(double_id), 0); + ASSERT_EQ(segment->num_chunk_index(str_id), 0); + auto chunk_span1 = segment->chunk_data(counter_id, 0); + auto chunk_span2 = segment->chunk_data(double_id, 0); + auto chunk_span3 = segment->chunk_data(str_id, 0); auto ref1 = dataset.get_col(counter_id); auto ref2 = dataset.get_col(double_id); auto ref3 = dataset.get_col(str_id)->scalars().string_data().data(); @@ -431,36 +527,6 @@ TEST(Sealed, LoadFieldData) { segment->DropIndex(fakevec_id); ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time)); - // segment->LoadIndex(vec_info); - // auto sr2 = segment->Search(plan.get(), ph_group.get(), time); - // auto json2 = SearchResultToJson(*sr); - // ASSERT_EQ(json.dump(-2), json2.dump(-2)); - // segment->DropFieldData(double_id); - // ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time)); - // #ifdef __linux__ - // auto std_json = Json::parse(R"( - //[ - // [ - // ["982->0.000000", "25315->4.742000", "57893->4.758000", "48201->6.075000", "53853->6.223000"], - // ["41772->10.111000", "74859->11.790000", "79777->11.842000", "3785->11.983000", "35888->12.193000"], - // ["59251->2.543000", "65551->4.454000", "72204->5.332000", "96905->5.479000", "87833->5.765000"], - // ["59219->5.458000", "21995->6.078000", "97922->6.764000", "25710->7.158000", "14048->7.294000"], - // ["66353->5.696000", "30664->5.881000", "41087->5.917000", "10393->6.633000", "90215->7.202000"] - // ] - //])"); - // #else // for mac - // auto std_json = Json::parse(R"( - //[ - // [ - // ["982->0.000000", "31864->4.270000", "18916->4.651000", "71547->5.125000", "86706->5.991000"], - // ["96984->4.192000", "65514->6.011000", "89328->6.138000", "80284->6.526000", "68218->6.563000"], - // ["30119->2.464000", "82365->4.725000", "74834->5.009000", "79995->5.725000", "33359->5.816000"], - // ["99625->6.129000", "86582->6.900000", "85934->7.792000", "60450->8.087000", "19257->8.530000"], - // ["37759->3.581000", "31292->5.780000", "98124->6.216000", "63535->6.439000", "11707->6.553000"] - // ] - //])"); - // #endif - // ASSERT_EQ(std_json.dump(-2), json.dump(-2)); } TEST(Sealed, LoadScalarIndex) { diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 4f52787a8c..336260f5f7 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -425,7 +425,10 @@ SearchResultToJson(const SearchResult& sr) { }; inline void -SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg, const std::set& exclude_fields = {}) { +SealedLoadFieldData(const GeneratedData& dataset, + SegmentSealed& seg, + const std::set& exclude_fields = {}, + bool with_mmap = false) { auto row_count = dataset.row_ids_.size(); { LoadFieldDataInfo info; @@ -451,6 +454,9 @@ SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg, const std: continue; } LoadFieldDataInfo info; + if (with_mmap) { + info.mmap_dir_path = "./data/mmap-test"; + } info.field_id = field_data.field_id(); info.row_count = row_count; info.field_data = &field_data; diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 8d8d40d29f..894452e68b 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -295,6 +295,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { indexBlobs, ) if err != nil { + log.Warn("failed to serialize index", zap.Error(err)) return err } encodeIndexFileDur := it.tr.Record("index codec serialize done") diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 66cf0503b2..7608db5326 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -780,6 +780,7 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps } // -------------------------------------------------------------------------------------- interfaces for sealed segment + func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *schemapb.FieldData) error { /* CStatus @@ -800,11 +801,18 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche return err } + var mmapDirPath *C.char = nil + path := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() + if len(path) > 0 { + mmapDirPath = C.CString(path) + defer C.free(unsafe.Pointer(mmapDirPath)) + } loadInfo := C.CLoadFieldDataInfo{ - field_id: C.int64_t(fieldID), - blob: (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), - blob_size: C.uint64_t(len(dataBlob)), - row_count: C.int64_t(rowCount), + field_id: C.int64_t(fieldID), + blob: (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), + blob_size: C.uint64_t(len(dataBlob)), + row_count: C.int64_t(rowCount), + mmap_dir_path: mmapDirPath, } status := C.LoadFieldData(s.segmentPtr, loadInfo) diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index 1727372604..2c4c347900 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" ) func TestSegmentLoader_loadSegment(t *testing.T) { @@ -82,6 +83,40 @@ func TestSegmentLoader_loadSegment(t *testing.T) { assert.NoError(t, err) }) + t.Run("test load segment with mmap", func(t *testing.T) { + key := paramtable.Get().QueryNodeCfg.MmapDirPath.Key + paramtable.Get().Save(key, "/tmp/mmap-test") + defer paramtable.Get().Reset(key) + node, err := genSimpleQueryNode(ctx) + require.NoError(t, err) + defer node.Stop() + + node.metaReplica.removeSegment(defaultSegmentID, segmentTypeSealed) + loader := node.loader + assert.NotNil(t, loader) + + req := &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_LoadSegments, + MsgID: rand.Int63(), + }, + DstNodeID: 0, + Schema: schema, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: defaultSegmentID, + PartitionID: defaultPartitionID, + CollectionID: defaultCollectionID, + BinlogPaths: fieldBinlog, + Statslogs: statsLog, + }, + }, + } + + _, err = loader.LoadSegment(ctx, req, segmentTypeSealed) + assert.NoError(t, err) + }) + t.Run("test load segment error due to partial success", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 80bb81df1c..9e1ac34052 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1305,6 +1305,7 @@ type queryNodeConfig struct { // cache limit CacheEnabled ParamItem `refreshable:"false"` CacheMemoryLimit ParamItem `refreshable:"false"` + MmapDirPath ParamItem `refreshable:"false"` GroupEnabled ParamItem `refreshable:"true"` MaxReceiveChanSize ParamItem `refreshable:"false"` @@ -1449,6 +1450,14 @@ func (p *queryNodeConfig) init(base *BaseTable) { } p.CacheEnabled.Init(base.mgr) + p.MmapDirPath = ParamItem{ + Key: "queryNode.mmapDirPath", + Version: "2.3.0", + DefaultValue: "", + Doc: "The folder that storing data files for mmap, setting to a path will enable Milvus to load data with mmap", + } + p.MmapDirPath.Init(base.mgr) + p.GroupEnabled = ParamItem{ Key: "queryNode.grouping.enabled", Version: "2.0.0",