Check data consistency after loading (#26312)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/26319/head
yah01 2023-08-14 09:01:32 +08:00 committed by GitHub
parent a5c2ad2c46
commit 127c23d999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 51 deletions

View File

@ -35,7 +35,7 @@ static int mmap_flags = MAP_SHARED;
class ColumnBase {
public:
// memory mode ctor
ColumnBase(size_t num_rows, const FieldMeta& field_meta) {
ColumnBase(size_t capacity, const FieldMeta& field_meta) {
// simdjson requires a padding following the json data
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
@ -45,12 +45,12 @@ class ColumnBase {
return;
}
size_ = field_meta.get_sizeof() * num_rows + padding_;
cap_ = field_meta.get_sizeof() * capacity;
auto data_type = field_meta.get_data_type();
// use anon mapping so we are able to free these memory with munmap only
data_ = static_cast<char*>(mmap(nullptr,
size_,
cap_ + padding_,
PROT_READ | PROT_WRITE,
mmap_flags | MAP_ANON,
-1,
@ -67,23 +67,18 @@ class ColumnBase {
: 0;
len_ = size;
size_ = size + padding_;
data_ = static_cast<char*>(
mmap(nullptr, size_, PROT_READ, mmap_flags, file.Descriptor(), 0));
#ifndef MAP_POPULATE
// Manually access the mapping to populate it
const size_t page_size = getpagesize();
char* begin = (char*)data_;
char* end = begin + len_;
for (char* page = begin; page < end; page += page_size) {
char value = page[0];
}
#endif
cap_ = size;
data_ = static_cast<char*>(mmap(nullptr,
cap_ + padding_,
PROT_READ,
mmap_flags,
file.Descriptor(),
0));
}
virtual ~ColumnBase() {
if (data_ != nullptr) {
if (munmap(data_, size_)) {
if (munmap(data_, cap_)) {
AssertInfo(true,
fmt::format("failed to unmap variable field, err={}",
strerror(errno)));
@ -92,9 +87,9 @@ class ColumnBase {
}
ColumnBase(ColumnBase&& column) noexcept
: data_(column.data_), size_(column.size_) {
: data_(column.data_), cap_(column.cap_), padding_(column.padding_) {
column.data_ = nullptr;
column.size_ = 0;
column.cap_ = 0;
}
const char*
@ -102,19 +97,27 @@ class ColumnBase {
return data_;
}
virtual size_t
NumRows() const = 0;
size_t
Capacity() const {
return cap_;
}
virtual SpanBase
Span() const = 0;
// build only
void
Append(const char* data, size_t size) {
size_t required_size = len_ + size;
if (required_size + padding_ > size_) {
Append(const char* data, size_t num_rows) {
size_t required_size = len_ + num_rows;
if (required_size > cap_) {
Expand(required_size * 2 + padding_);
}
std::copy_n(data, size, data_ + len_);
len_ += size;
std::copy_n(data, num_rows, data_ + len_);
len_ += num_rows;
}
protected:
@ -133,7 +136,7 @@ class ColumnBase {
if (data_ != nullptr) {
std::memcpy(data, data_, len_);
if (munmap(data_, size_)) {
if (munmap(data_, cap_)) {
AssertInfo(
false,
fmt::format("failed to unmap while expanding, err={}",
@ -142,11 +145,11 @@ class ColumnBase {
}
data_ = data;
size_ = size;
cap_ = size;
}
char* data_{nullptr};
size_t size_{0};
size_t cap_{0};
size_t padding_{0};
// build only
@ -174,13 +177,13 @@ class Column : public ColumnBase {
~Column() override = default;
size_t
NumRows() const {
NumRows() const override {
return num_rows_;
}
SpanBase
Span() const override {
return SpanBase(data_, num_rows_, size_ / num_rows_);
return SpanBase(data_, num_rows_, cap_ / num_rows_);
}
private:
@ -212,7 +215,7 @@ class VariableColumn : public ColumnBase {
~VariableColumn() override = default;
size_t
NumRows() const {
NumRows() const override {
return indices_.size();
}

View File

@ -24,6 +24,7 @@
#include "Utils.h"
#include "Types.h"
#include "common/Json.h"
#include "exceptions/EasyAssert.h"
#include "mmap/Column.h"
#include "common/Consts.h"
#include "common/FieldMeta.h"
@ -84,13 +85,13 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
AssertInfo(
!get_bit(index_ready_bitset_, field_id),
"vector index has been exist at " + std::to_string(field_id.get()));
if (row_count_opt_.has_value()) {
AssertInfo(row_count_opt_.value() == row_count,
if (num_rows_.has_value()) {
AssertInfo(num_rows_.value() == row_count,
"field (" + std::to_string(field_id.get()) +
") data has different row count (" +
std::to_string(row_count) +
") than other column's row count (" +
std::to_string(row_count_opt_.value()) + ")");
std::to_string(num_rows_.value()) + ")");
}
AssertInfo(!vector_indexings_.is_ready(field_id), "vec index is not ready");
vector_indexings_.append_field_indexing(
@ -118,13 +119,13 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
AssertInfo(
!get_bit(index_ready_bitset_, field_id),
"scalar index has been exist at " + std::to_string(field_id.get()));
if (row_count_opt_.has_value()) {
AssertInfo(row_count_opt_.value() == row_count,
if (num_rows_.has_value()) {
AssertInfo(num_rows_.value() == row_count,
"field (" + std::to_string(field_id.get()) +
") data has different row count (" +
std::to_string(row_count) +
") than other column's row count (" +
std::to_string(row_count_opt_.value()) + ")");
std::to_string(num_rows_.value()) + ")");
}
scalar_indexings_[field_id] =
@ -315,6 +316,13 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
column->Append(static_cast<const char*>(field_data->Data()),
field_data->Size());
}
AssertInfo(column->NumRows() == num_rows,
fmt::format("data lost while loading column {}: loaded "
"num rows {} but expected {}",
data.field_id,
column->NumRows(),
num_rows));
}
{
@ -500,14 +508,14 @@ int64_t
SegmentSealedImpl::GetMemoryUsageInBytes() const {
// TODO: add estimate for index
std::shared_lock lck(mutex_);
auto row_count = row_count_opt_.value_or(0);
auto row_count = num_rows_.value_or(0);
return schema_->get_total_sizeof() * row_count;
}
int64_t
SegmentSealedImpl::get_row_count() const {
std::shared_lock lck(mutex_);
return row_count_opt_.value_or(0);
return num_rows_.value_or(0);
}
int64_t
@ -569,9 +577,9 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info,
AssertInfo(
get_bit(field_data_ready_bitset_, field_id),
"Field Data is not loaded: " + std::to_string(field_id.get()));
AssertInfo(row_count_opt_.has_value(), "Can't get row count value");
auto row_count = row_count_opt_.value();
auto& vec_data = fields_.at(field_id);
AssertInfo(num_rows_.has_value(), "Can't get row count value");
auto row_count = num_rows_.value();
auto vec_data = fields_.at(field_id);
query::SearchOnSealed(*schema_,
vec_data->Data(),
search_info,
@ -808,15 +816,18 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
Assert(get_bit(field_data_ready_bitset_, field_id));
// DO NOT directly access the column byh map like: `fields_.at(field_id)->Data()`,
// we have to clone the shared pointer,
// to make sure it won't get released if segment released
auto column = fields_.at(field_id);
if (datatype_is_variable(field_meta.get_data_type())) {
switch (field_meta.get_data_type()) {
case DataType::VARCHAR:
case DataType::STRING: {
FixedVector<std::string> output(count);
bulk_subscript_impl<std::string>(fields_.at(field_id).get(),
seg_offsets,
count,
output.data());
bulk_subscript_impl<std::string>(
column.get(), seg_offsets, count, output.data());
return CreateScalarDataArrayFrom(
output.data(), count, field_meta);
}
@ -824,10 +835,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
case DataType::JSON: {
FixedVector<std::string> output(count);
bulk_subscript_impl<Json, std::string>(
fields_.at(field_id).get(),
seg_offsets,
count,
output.data());
column.get(), seg_offsets, count, output.data());
return CreateScalarDataArrayFrom(
output.data(), count, field_meta);
}
@ -839,7 +847,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
}
}
auto src_vec = fields_.at(field_id)->Data();
auto src_vec = column->Data();
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
FixedVector<bool> output(count);

View File

@ -182,7 +182,7 @@ class SegmentSealedImpl : public SegmentSealed {
// if (row_count_opt_.has_value()) {
// AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns");
// } else {
row_count_opt_ = row_count;
num_rows_ = row_count;
// }
}
@ -230,7 +230,7 @@ class SegmentSealedImpl : public SegmentSealed {
// segment data
// TODO: generate index for scalar
std::optional<int64_t> row_count_opt_;
std::optional<int64_t> num_rows_;
// scalar field index
std::unordered_map<FieldId, index::IndexBasePtr> scalar_indexings_;