From 127c23d999a224b22a20331ed2372b29c56b3697 Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 14 Aug 2023 09:01:32 +0800 Subject: [PATCH] Check data consistency after loading (#26312) Signed-off-by: yah01 --- internal/core/src/mmap/Column.h | 61 ++++++++++--------- .../core/src/segcore/SegmentSealedImpl.cpp | 48 +++++++++------ internal/core/src/segcore/SegmentSealedImpl.h | 4 +- 3 files changed, 62 insertions(+), 51 deletions(-) diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index 233e2a16d2..0af354713e 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -35,7 +35,7 @@ static int mmap_flags = MAP_SHARED; class ColumnBase { public: // memory mode ctor - ColumnBase(size_t num_rows, const FieldMeta& field_meta) { + ColumnBase(size_t capacity, const FieldMeta& field_meta) { // simdjson requires a padding following the json data padding_ = field_meta.get_data_type() == DataType::JSON ? simdjson::SIMDJSON_PADDING @@ -45,12 +45,12 @@ class ColumnBase { return; } - size_ = field_meta.get_sizeof() * num_rows + padding_; + cap_ = field_meta.get_sizeof() * capacity; auto data_type = field_meta.get_data_type(); // use anon mapping so we are able to free these memory with munmap only data_ = static_cast(mmap(nullptr, - size_, + cap_ + padding_, PROT_READ | PROT_WRITE, mmap_flags | MAP_ANON, -1, @@ -67,23 +67,18 @@ class ColumnBase { : 0; len_ = size; - size_ = size + padding_; - data_ = static_cast( - mmap(nullptr, size_, PROT_READ, mmap_flags, file.Descriptor(), 0)); -#ifndef MAP_POPULATE - // Manually access the mapping to populate it - const size_t page_size = getpagesize(); - char* begin = (char*)data_; - char* end = begin + len_; - for (char* page = begin; page < end; page += page_size) { - char value = page[0]; - } -#endif + cap_ = size; + data_ = static_cast(mmap(nullptr, + cap_ + padding_, + PROT_READ, + mmap_flags, + file.Descriptor(), + 0)); } virtual ~ColumnBase() { if (data_ != nullptr) { - if (munmap(data_, size_)) { + if (munmap(data_, cap_)) { AssertInfo(true, fmt::format("failed to unmap variable field, err={}", strerror(errno))); @@ -92,9 +87,9 @@ class ColumnBase { } ColumnBase(ColumnBase&& column) noexcept - : data_(column.data_), size_(column.size_) { + : data_(column.data_), cap_(column.cap_), padding_(column.padding_) { column.data_ = nullptr; - column.size_ = 0; + column.cap_ = 0; } const char* @@ -102,19 +97,27 @@ class ColumnBase { return data_; } + virtual size_t + NumRows() const = 0; + + size_t + Capacity() const { + return cap_; + } + virtual SpanBase Span() const = 0; // build only void - Append(const char* data, size_t size) { - size_t required_size = len_ + size; - if (required_size + padding_ > size_) { + Append(const char* data, size_t num_rows) { + size_t required_size = len_ + num_rows; + if (required_size > cap_) { Expand(required_size * 2 + padding_); } - std::copy_n(data, size, data_ + len_); - len_ += size; + std::copy_n(data, num_rows, data_ + len_); + len_ += num_rows; } protected: @@ -133,7 +136,7 @@ class ColumnBase { if (data_ != nullptr) { std::memcpy(data, data_, len_); - if (munmap(data_, size_)) { + if (munmap(data_, cap_)) { AssertInfo( false, fmt::format("failed to unmap while expanding, err={}", @@ -142,11 +145,11 @@ class ColumnBase { } data_ = data; - size_ = size; + cap_ = size; } char* data_{nullptr}; - size_t size_{0}; + size_t cap_{0}; size_t padding_{0}; // build only @@ -174,13 +177,13 @@ class Column : public ColumnBase { ~Column() override = default; size_t - NumRows() const { + NumRows() const override { return num_rows_; } SpanBase Span() const override { - return SpanBase(data_, num_rows_, size_ / num_rows_); + return SpanBase(data_, num_rows_, cap_ / num_rows_); } private: @@ -212,7 +215,7 @@ class VariableColumn : public ColumnBase { ~VariableColumn() override = default; size_t - NumRows() const { + NumRows() const override { return indices_.size(); } diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 0e619f0c61..2fc2f586ed 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -24,6 +24,7 @@ #include "Utils.h" #include "Types.h" #include "common/Json.h" +#include "exceptions/EasyAssert.h" #include "mmap/Column.h" #include "common/Consts.h" #include "common/FieldMeta.h" @@ -84,13 +85,13 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) { AssertInfo( !get_bit(index_ready_bitset_, field_id), "vector index has been exist at " + std::to_string(field_id.get())); - if (row_count_opt_.has_value()) { - AssertInfo(row_count_opt_.value() == row_count, + if (num_rows_.has_value()) { + AssertInfo(num_rows_.value() == row_count, "field (" + std::to_string(field_id.get()) + ") data has different row count (" + std::to_string(row_count) + ") than other column's row count (" + - std::to_string(row_count_opt_.value()) + ")"); + std::to_string(num_rows_.value()) + ")"); } AssertInfo(!vector_indexings_.is_ready(field_id), "vec index is not ready"); vector_indexings_.append_field_indexing( @@ -118,13 +119,13 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { AssertInfo( !get_bit(index_ready_bitset_, field_id), "scalar index has been exist at " + std::to_string(field_id.get())); - if (row_count_opt_.has_value()) { - AssertInfo(row_count_opt_.value() == row_count, + if (num_rows_.has_value()) { + AssertInfo(num_rows_.value() == row_count, "field (" + std::to_string(field_id.get()) + ") data has different row count (" + std::to_string(row_count) + ") than other column's row count (" + - std::to_string(row_count_opt_.value()) + ")"); + std::to_string(num_rows_.value()) + ")"); } scalar_indexings_[field_id] = @@ -315,6 +316,13 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { column->Append(static_cast(field_data->Data()), field_data->Size()); } + + AssertInfo(column->NumRows() == num_rows, + fmt::format("data lost while loading column {}: loaded " + "num rows {} but expected {}", + data.field_id, + column->NumRows(), + num_rows)); } { @@ -500,14 +508,14 @@ int64_t SegmentSealedImpl::GetMemoryUsageInBytes() const { // TODO: add estimate for index std::shared_lock lck(mutex_); - auto row_count = row_count_opt_.value_or(0); + auto row_count = num_rows_.value_or(0); return schema_->get_total_sizeof() * row_count; } int64_t SegmentSealedImpl::get_row_count() const { std::shared_lock lck(mutex_); - return row_count_opt_.value_or(0); + return num_rows_.value_or(0); } int64_t @@ -569,9 +577,9 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info, AssertInfo( get_bit(field_data_ready_bitset_, field_id), "Field Data is not loaded: " + std::to_string(field_id.get())); - AssertInfo(row_count_opt_.has_value(), "Can't get row count value"); - auto row_count = row_count_opt_.value(); - auto& vec_data = fields_.at(field_id); + AssertInfo(num_rows_.has_value(), "Can't get row count value"); + auto row_count = num_rows_.value(); + auto vec_data = fields_.at(field_id); query::SearchOnSealed(*schema_, vec_data->Data(), search_info, @@ -808,15 +816,18 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, Assert(get_bit(field_data_ready_bitset_, field_id)); + // DO NOT directly access the column byh map like: `fields_.at(field_id)->Data()`, + // we have to clone the shared pointer, + // to make sure it won't get released if segment released + auto column = fields_.at(field_id); + if (datatype_is_variable(field_meta.get_data_type())) { switch (field_meta.get_data_type()) { case DataType::VARCHAR: case DataType::STRING: { FixedVector output(count); - bulk_subscript_impl(fields_.at(field_id).get(), - seg_offsets, - count, - output.data()); + bulk_subscript_impl( + column.get(), seg_offsets, count, output.data()); return CreateScalarDataArrayFrom( output.data(), count, field_meta); } @@ -824,10 +835,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, case DataType::JSON: { FixedVector output(count); bulk_subscript_impl( - fields_.at(field_id).get(), - seg_offsets, - count, - output.data()); + column.get(), seg_offsets, count, output.data()); return CreateScalarDataArrayFrom( output.data(), count, field_meta); } @@ -839,7 +847,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, } } - auto src_vec = fields_.at(field_id)->Data(); + auto src_vec = column->Data(); switch (field_meta.get_data_type()) { case DataType::BOOL: { FixedVector output(count); diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 4b6357ab9d..2fcb4b86a9 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -182,7 +182,7 @@ class SegmentSealedImpl : public SegmentSealed { // if (row_count_opt_.has_value()) { // AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns"); // } else { - row_count_opt_ = row_count; + num_rows_ = row_count; // } } @@ -230,7 +230,7 @@ class SegmentSealedImpl : public SegmentSealed { // segment data // TODO: generate index for scalar - std::optional row_count_opt_; + std::optional num_rows_; // scalar field index std::unordered_map scalar_indexings_;