diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index 372a5527ac..40a387f476 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -33,7 +33,7 @@ struct FieldBinlogInfo { struct LoadFieldDataInfo { std::map field_infos; - // Set null to disable mmap, + // Set empty to disable mmap, // mmap file path will be {mmap_dir_path}/{segment_id}/{field_id} std::string mmap_dir_path = ""; }; diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index f0e0ff76d7..65f9f8c875 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -16,29 +16,76 @@ #pragma once #include - +#include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include "common/FieldMeta.h" +#include "common/Span.h" +#include "exceptions/EasyAssert.h" +#include "fmt/format.h" #include "mmap/Utils.h" namespace milvus { -struct Entry { - char* data; - uint32_t length; -}; +#ifdef MAP_POPULATE +static int mmap_flags = MAP_PRIVATE | MAP_POPULATE; +#else +static int mmap_flags = MAP_PRIVATE; +#endif class ColumnBase { public: - ColumnBase() = default; + // memory mode ctor + ColumnBase(size_t num_rows, const FieldMeta& field_meta) { + // simdjson requires a padding following the json data + padding_ = field_meta.get_data_type() == DataType::JSON + ? simdjson::SIMDJSON_PADDING + : 0; + + if (datatype_is_variable(field_meta.get_data_type())) { + return; + } + + size_ = field_meta.get_sizeof() * num_rows + padding_; + auto data_type = field_meta.get_data_type(); + + // use anon mapping so we are able to free these memory with munmap only + data_ = static_cast(mmap(nullptr, + size_, + PROT_READ | PROT_WRITE, + mmap_flags | MAP_ANON, + -1, + 0)); + AssertInfo( + data_ != MAP_FAILED, + fmt::format("failed to create anon map, err: {}", strerror(errno))); + } + + // mmap mode ctor + ColumnBase(int fd, size_t size, const FieldMeta& field_meta) { + padding_ = field_meta.get_data_type() == DataType::JSON + ? simdjson::SIMDJSON_PADDING + : 0; + + len_ = size; + size_ = size + padding_; + data_ = static_cast( + mmap(nullptr, size_, PROT_READ, mmap_flags, fd, 0)); +#ifndef MAP_POPULATE + // Manually access the mapping to populate it + const size_t page_size = getpagesize(); + char* begin = (char*)data_; + char* end = begin + len_; + for (char* page = begin; page < end; page += page_size) { + char value = page[0]; + } +#endif + } + virtual ~ColumnBase() { - if (data_ != nullptr && data_ != MAP_FAILED) { + if (data_ != nullptr) { if (munmap(data_, size_)) { AssertInfo(true, fmt::format("failed to unmap variable field, err={}", @@ -54,47 +101,93 @@ class ColumnBase { } const char* - data() const { + Data() const { return data_; } - [[nodiscard]] size_t - size() const { + size_t + Size() const { return size_; } virtual SpanBase - span() const = 0; + Span() const = 0; + + // build only + void + Append(const char* data, size_t size) { + size_t required_size = len_ + size; + if (required_size + padding_ > size_) { + Expand(required_size * 2 + padding_); + } + + std::copy_n(data, size, data_ + len_); + len_ += size; + } protected: + // only for memory mode, not mmap + void + Expand(size_t size) { + auto data = static_cast(mmap(nullptr, + size, + PROT_READ | PROT_WRITE, + mmap_flags | MAP_ANON, + -1, + 0)); + + AssertInfo(data != MAP_FAILED, + fmt::format("failed to create map: {}", strerror(errno))); + + if (data_ != nullptr) { + std::memcpy(data, data_, len_); + if (munmap(data_, size_)) { + AssertInfo( + false, + fmt::format("failed to unmap while expanding, err={}", + strerror(errno))); + } + } + + data_ = data; + size_ = size; + } + char* data_{nullptr}; - uint64_t size_{0}; + size_t size_{0}; + size_t padding_{0}; + + // build only + size_t len_{0}; }; class Column : public ColumnBase { public: - Column(int64_t segment_id, - const FieldMeta& field_meta, - const FieldDataInfo& info) { - data_ = static_cast(CreateMap(segment_id, field_meta, info)); - size_ = field_meta.get_sizeof() * info.row_count; - row_count_ = info.row_count; + // memory mode ctor + Column(size_t num_rows, const FieldMeta& field_meta) + : ColumnBase(num_rows, field_meta), num_rows_(num_rows) { + } + + // mmap mode ctor + Column(int fd, size_t size, const FieldMeta& field_meta) + : ColumnBase(fd, size, field_meta), + num_rows_(size / field_meta.get_sizeof()) { } Column(Column&& column) noexcept - : ColumnBase(std::move(column)), row_count_(column.row_count_) { - column.row_count_ = 0; + : ColumnBase(std::move(column)), num_rows_(column.num_rows_) { + column.num_rows_ = 0; } ~Column() override = default; SpanBase - span() const override { - return SpanBase(data_, row_count_, size_ / row_count_); + Span() const override { + return SpanBase(data_, num_rows_, size_ / num_rows_); } private: - int64_t row_count_{}; + size_t num_rows_{}; }; template @@ -103,37 +196,31 @@ class VariableColumn : public ColumnBase { using ViewType = std::conditional_t, std::string_view, T>; - VariableColumn(int64_t segment_id, - const FieldMeta& field_meta, - const FieldDataInfo& info) { - indices_.reserve(info.row_count); - for (auto data : info.datas) { - for (ssize_t idx = 0; idx < data->get_num_rows(); ++idx) { - indices_.emplace_back(size_); - size_ += data->Size(idx); - } - } - - data_ = static_cast(CreateMap(segment_id, field_meta, info)); - construct_views(); + // memory mode ctor + VariableColumn(size_t num_rows, const FieldMeta& field_meta) + : ColumnBase(num_rows, field_meta) { } - VariableColumn(VariableColumn&& field) noexcept - : indices_(std::move(field.indices_)), views_(std::move(field.views_)) { - data_ = field.data(); - size_ = field.size(); - field.data_ = nullptr; + // mmap mode ctor + VariableColumn(int fd, size_t size, const FieldMeta& field_meta) + : ColumnBase(fd, size, field_meta) { + } + + VariableColumn(VariableColumn&& column) noexcept + : ColumnBase(std::move(column)), + indices_(std::move(column.indices_)), + views_(std::move(column.views_)) { } ~VariableColumn() override = default; SpanBase - span() const override { + Span() const override { return SpanBase(views_.data(), views_.size(), sizeof(ViewType)); } [[nodiscard]] const std::vector& - views() const { + Views() const { return views_; } @@ -143,21 +230,35 @@ class VariableColumn : public ColumnBase { } std::string_view - raw_at(const int i) const { - size_t len = (i == indices_.size() - 1) ? size_ - indices_.back() + RawAt(const int i) const { + size_t len = (i == indices_.size() - 1) ? len_ - indices_.back() : indices_[i + 1] - indices_[i]; return std::string_view(data_ + indices_[i], len); } + void + Append(const char* data, size_t size) { + indices_.emplace_back(len_); + ColumnBase::Append(data, size); + } + + void + Seal(std::vector indices = {}) { + if (!indices.empty()) { + indices_ = std::move(indices); + } + ConstructViews(); + } + protected: void - construct_views() { + ConstructViews() { views_.reserve(indices_.size()); for (size_t i = 0; i < indices_.size() - 1; i++) { views_.emplace_back(data_ + indices_[i], indices_[i + 1] - indices_[i]); } - views_.emplace_back(data_ + indices_.back(), size_ - indices_.back()); + views_.emplace_back(data_ + indices_.back(), len_ - indices_.back()); } private: diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 97ccae7894..e526ff9b7c 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -14,13 +14,16 @@ #include #include +#include #include #include #include #include +#include #include "Utils.h" #include "Types.h" +#include "common/Json.h" #include "mmap/Column.h" #include "common/Consts.h" #include "common/FieldMeta.h" @@ -178,9 +181,15 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { int64_t num_rows = storage::GetTotalNumRowsForFieldDatas(field_datas); AssertInfo(num_rows == info.row_count, "inconsistent field data row count with meta"); + auto field_data_info = FieldDataInfo{ field_id.get(), num_rows, field_datas, load_info.mmap_dir_path}; - LoadFieldData(field_id, field_data_info); + if (load_info.mmap_dir_path.empty() || + SystemProperty::Instance().IsSystem(field_id)) { + LoadFieldData(field_id, field_data_info); + } else { + MapFieldData(field_id, field_data_info); + } } } @@ -251,13 +260,37 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, switch (data_type) { case milvus::DataType::STRING: case milvus::DataType::VARCHAR: { - column = std::make_unique>( - get_segment_id(), field_meta, data_info); + auto var_column = + std::make_unique>( + num_rows, field_meta); + for (auto& data : data_info.datas) { + for (auto i = 0; i < data->get_num_rows(); i++) { + auto str = static_cast( + data->RawValue(i)); + var_column->Append(str->data(), str->size()); + } + } + var_column->Seal(); + column = std::move(var_column); break; } case milvus::DataType::JSON: { - column = std::make_unique>( - get_segment_id(), field_meta, data_info); + auto var_column = + std::make_unique>( + num_rows, field_meta); + for (auto& data : data_info.datas) { + for (auto i = 0; i < data->get_num_rows(); i++) { + auto padded_string = + static_cast( + data->RawValue(i)) + ->data(); + var_column->Append(padded_string.data(), + padded_string.size()); + } + } + var_column->Seal(); + column = std::move(var_column); + break; } default: { } @@ -266,8 +299,11 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, std::unique_lock lck(mutex_); variable_fields_.emplace(field_id, std::move(column)); } else { - auto column = Column(get_segment_id(), field_meta, data_info); - + auto column = Column(num_rows, field_meta); + for (auto& data : data_info.datas) { + column.Append(static_cast(data->Data()), + data->Size()); + } std::unique_lock lck(mutex_); fixed_fields_.emplace(field_id, std::move(column)); } @@ -287,6 +323,106 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, update_row_count(num_rows); } +void +SegmentSealedImpl::MapFieldData(const FieldId field_id, + const FieldDataInfo& data_info) { + auto filepath = std::filesystem::path(data_info.mmap_dir_path) / + std::to_string(get_segment_id()) / + std::to_string(field_id.get()); + auto dir = filepath.parent_path(); + std::filesystem::create_directories(dir); + + int fd = + open(filepath.c_str(), O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR); + AssertInfo(fd != -1, + fmt::format("failed to create mmap file {}", filepath.c_str())); + + auto& field_meta = (*schema_)[field_id]; + auto data_type = field_meta.get_data_type(); + + // write the field data to disk + ssize_t total_written{0}; + auto data_size = GetDataSize(data_info.datas); + std::vector indices{}; + for (auto& data : data_info.datas) { + auto written = WriteFieldData(fd, data_type, data); + if (written != data->Size()) { + break; + } + indices.emplace_back(total_written); + total_written += written; + } + AssertInfo( + total_written == data_size || + total_written != -1 && + datatype_is_variable(field_meta.get_data_type()), + fmt::format( + "failed to write data file {}, written {} but total {}, err: {}", + filepath.c_str(), + total_written, + data_size, + strerror(errno))); + int ok = fsync(fd); + AssertInfo(ok == 0, + fmt::format("failed to fsync mmap data file {}, err: {}", + filepath.c_str(), + strerror(errno))); + + auto num_rows = data_info.row_count; + if (datatype_is_variable(data_type)) { + std::unique_ptr column{}; + switch (data_type) { + case milvus::DataType::STRING: + case milvus::DataType::VARCHAR: { + auto var_column = std::make_unique>( + fd, total_written, field_meta); + var_column->Seal(std::move(indices)); + column = std::move(var_column); + break; + } + case milvus::DataType::JSON: { + auto var_column = + std::make_unique>( + fd, total_written, field_meta); + var_column->Seal(std::move(indices)); + column = std::move(var_column); + break; + } + default: { + } + } + + std::unique_lock lck(mutex_); + variable_fields_.emplace(field_id, std::move(column)); + } else { + auto column = Column(fd, total_written, field_meta); + std::unique_lock lck(mutex_); + fixed_fields_.emplace(field_id, std::move(column)); + } + + ok = unlink(filepath.c_str()); + AssertInfo(ok == 0, + fmt::format("failed to unlink mmap data file {}, err: {}", + filepath.c_str(), + strerror(errno))); + ok = close(fd); + AssertInfo(ok == 0, + fmt::format("failed to close data file {}, err: {}", + filepath.c_str(), + strerror(errno))); + + // set pks to offset + if (schema_->get_primary_field_id() == field_id) { + AssertInfo(field_id.get() != -1, "Primary key is -1"); + AssertInfo(insert_record_.empty_pks(), "already exists"); + insert_record_.insert_pks(data_info.datas); + insert_record_.seal_pks(); + } + + std::unique_lock lck(mutex_); + set_bit(field_data_ready_bitset_, field_id, true); +} + void SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { AssertInfo(info.row_count > 0, "The row count of deleted record is 0"); @@ -340,12 +476,12 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { auto element_sizeof = field_meta.get_sizeof(); if (auto it = fixed_fields_.find(field_id); it != fixed_fields_.end()) { auto& field_data = it->second; - return field_data.span(); + return field_data.Span(); } if (auto it = variable_fields_.find(field_id); it != variable_fields_.end()) { auto& field = it->second; - return field->span(); + return field->Span(); } auto field_data = insert_record_.get_field_data_base(field_id); AssertInfo(field_data->num_chunk() == 1, @@ -439,7 +575,7 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info, auto row_count = row_count_opt_.value(); auto& vec_data = fixed_fields_.at(field_id); query::SearchOnSealed(*schema_, - vec_data.data(), + vec_data.Data(), search_info, query_data, query_count, @@ -614,7 +750,7 @@ SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column, for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (offset != INVALID_SEG_OFFSET) { - dst[i] = std::move(T(field->raw_at(offset))); + dst[i] = std::move(T(field->RawAt(offset))); } } } @@ -706,7 +842,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, } } - auto src_vec = fixed_fields_.at(field_id).data(); + auto src_vec = fixed_fields_.at(field_id).Data(); switch (field_meta.get_data_type()) { case DataType::BOOL: { FixedVector output(count); diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 9fdd70d6f8..f4a93e3c78 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -55,9 +55,13 @@ class SegmentSealedImpl : public SegmentSealed { HasIndex(FieldId field_id) const override; bool HasFieldData(FieldId field_id) const override; + void LoadFieldData(FieldId field_id, const FieldDataInfo& data_info) override; + void + MapFieldData(const FieldId field_id, const FieldDataInfo& data); + int64_t get_segment_id() const override { return id_; diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 77e63d0da7..c624b18443 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -552,12 +552,12 @@ LoadFieldDatasFromRemote(std::vector& remote_files) { std::sort(remote_files.begin(), remote_files.end(), [](const std::string& a, const std::string& b) { - return std::stol(a.substr(a.find_last_of("/") + 1)) < - std::stol(b.substr(b.find_last_of("/") + 1)); + return std::stol(a.substr(a.find_last_of('/') + 1)) < + std::stol(b.substr(b.find_last_of('/') + 1)); }); auto parallel_degree = - uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); std::vector batch_files; std::vector field_datas;