Refactor the column type (#25147)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/25173/head
yah01 2023-06-27 19:50:45 +08:00 committed by GitHub
parent 8d193a3e56
commit cb4b88d5cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 310 additions and 69 deletions

View File

@ -33,7 +33,7 @@ struct FieldBinlogInfo {
struct LoadFieldDataInfo {
std::map<int64_t, FieldBinlogInfo> field_infos;
// Set null to disable mmap,
// Set empty to disable mmap,
// mmap file path will be {mmap_dir_path}/{segment_id}/{field_id}
std::string mmap_dir_path = "";
};

View File

@ -16,29 +16,76 @@
#pragma once
#include <sys/mman.h>
#include <algorithm>
#include <cstddef>
#include <ostream>
#include <string_view>
#include <type_traits>
#include <vector>
#include <string>
#include <utility>
#include <cstring>
#include <filesystem>
#include "common/FieldMeta.h"
#include "common/Span.h"
#include "exceptions/EasyAssert.h"
#include "fmt/format.h"
#include "mmap/Utils.h"
namespace milvus {
struct Entry {
char* data;
uint32_t length;
};
#ifdef MAP_POPULATE
static int mmap_flags = MAP_PRIVATE | MAP_POPULATE;
#else
static int mmap_flags = MAP_PRIVATE;
#endif
class ColumnBase {
public:
ColumnBase() = default;
// memory mode ctor
ColumnBase(size_t num_rows, const FieldMeta& field_meta) {
// simdjson requires a padding following the json data
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
: 0;
if (datatype_is_variable(field_meta.get_data_type())) {
return;
}
size_ = field_meta.get_sizeof() * num_rows + padding_;
auto data_type = field_meta.get_data_type();
// use anon mapping so we are able to free these memory with munmap only
data_ = static_cast<char*>(mmap(nullptr,
size_,
PROT_READ | PROT_WRITE,
mmap_flags | MAP_ANON,
-1,
0));
AssertInfo(
data_ != MAP_FAILED,
fmt::format("failed to create anon map, err: {}", strerror(errno)));
}
// mmap mode ctor
ColumnBase(int fd, size_t size, const FieldMeta& field_meta) {
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
: 0;
len_ = size;
size_ = size + padding_;
data_ = static_cast<char*>(
mmap(nullptr, size_, PROT_READ, mmap_flags, fd, 0));
#ifndef MAP_POPULATE
// Manually access the mapping to populate it
const size_t page_size = getpagesize();
char* begin = (char*)data_;
char* end = begin + len_;
for (char* page = begin; page < end; page += page_size) {
char value = page[0];
}
#endif
}
virtual ~ColumnBase() {
if (data_ != nullptr && data_ != MAP_FAILED) {
if (data_ != nullptr) {
if (munmap(data_, size_)) {
AssertInfo(true,
fmt::format("failed to unmap variable field, err={}",
@ -54,47 +101,93 @@ class ColumnBase {
}
const char*
data() const {
Data() const {
return data_;
}
[[nodiscard]] size_t
size() const {
size_t
Size() const {
return size_;
}
virtual SpanBase
span() const = 0;
Span() const = 0;
// build only
void
Append(const char* data, size_t size) {
size_t required_size = len_ + size;
if (required_size + padding_ > size_) {
Expand(required_size * 2 + padding_);
}
std::copy_n(data, size, data_ + len_);
len_ += size;
}
protected:
// only for memory mode, not mmap
void
Expand(size_t size) {
auto data = static_cast<char*>(mmap(nullptr,
size,
PROT_READ | PROT_WRITE,
mmap_flags | MAP_ANON,
-1,
0));
AssertInfo(data != MAP_FAILED,
fmt::format("failed to create map: {}", strerror(errno)));
if (data_ != nullptr) {
std::memcpy(data, data_, len_);
if (munmap(data_, size_)) {
AssertInfo(
false,
fmt::format("failed to unmap while expanding, err={}",
strerror(errno)));
}
}
data_ = data;
size_ = size;
}
char* data_{nullptr};
uint64_t size_{0};
size_t size_{0};
size_t padding_{0};
// build only
size_t len_{0};
};
class Column : public ColumnBase {
public:
Column(int64_t segment_id,
const FieldMeta& field_meta,
const FieldDataInfo& info) {
data_ = static_cast<char*>(CreateMap(segment_id, field_meta, info));
size_ = field_meta.get_sizeof() * info.row_count;
row_count_ = info.row_count;
// memory mode ctor
Column(size_t num_rows, const FieldMeta& field_meta)
: ColumnBase(num_rows, field_meta), num_rows_(num_rows) {
}
// mmap mode ctor
Column(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta),
num_rows_(size / field_meta.get_sizeof()) {
}
Column(Column&& column) noexcept
: ColumnBase(std::move(column)), row_count_(column.row_count_) {
column.row_count_ = 0;
: ColumnBase(std::move(column)), num_rows_(column.num_rows_) {
column.num_rows_ = 0;
}
~Column() override = default;
SpanBase
span() const override {
return SpanBase(data_, row_count_, size_ / row_count_);
Span() const override {
return SpanBase(data_, num_rows_, size_ / num_rows_);
}
private:
int64_t row_count_{};
size_t num_rows_{};
};
template <typename T>
@ -103,37 +196,31 @@ class VariableColumn : public ColumnBase {
using ViewType =
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
VariableColumn(int64_t segment_id,
const FieldMeta& field_meta,
const FieldDataInfo& info) {
indices_.reserve(info.row_count);
for (auto data : info.datas) {
for (ssize_t idx = 0; idx < data->get_num_rows(); ++idx) {
indices_.emplace_back(size_);
size_ += data->Size(idx);
}
}
data_ = static_cast<char*>(CreateMap(segment_id, field_meta, info));
construct_views();
// memory mode ctor
VariableColumn(size_t num_rows, const FieldMeta& field_meta)
: ColumnBase(num_rows, field_meta) {
}
VariableColumn(VariableColumn&& field) noexcept
: indices_(std::move(field.indices_)), views_(std::move(field.views_)) {
data_ = field.data();
size_ = field.size();
field.data_ = nullptr;
// mmap mode ctor
VariableColumn(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta) {
}
VariableColumn(VariableColumn&& column) noexcept
: ColumnBase(std::move(column)),
indices_(std::move(column.indices_)),
views_(std::move(column.views_)) {
}
~VariableColumn() override = default;
SpanBase
span() const override {
Span() const override {
return SpanBase(views_.data(), views_.size(), sizeof(ViewType));
}
[[nodiscard]] const std::vector<ViewType>&
views() const {
Views() const {
return views_;
}
@ -143,21 +230,35 @@ class VariableColumn : public ColumnBase {
}
std::string_view
raw_at(const int i) const {
size_t len = (i == indices_.size() - 1) ? size_ - indices_.back()
RawAt(const int i) const {
size_t len = (i == indices_.size() - 1) ? len_ - indices_.back()
: indices_[i + 1] - indices_[i];
return std::string_view(data_ + indices_[i], len);
}
void
Append(const char* data, size_t size) {
indices_.emplace_back(len_);
ColumnBase::Append(data, size);
}
void
Seal(std::vector<uint64_t> indices = {}) {
if (!indices.empty()) {
indices_ = std::move(indices);
}
ConstructViews();
}
protected:
void
construct_views() {
ConstructViews() {
views_.reserve(indices_.size());
for (size_t i = 0; i < indices_.size() - 1; i++) {
views_.emplace_back(data_ + indices_[i],
indices_[i + 1] - indices_[i]);
}
views_.emplace_back(data_ + indices_.back(), size_ - indices_.back());
views_.emplace_back(data_ + indices_.back(), len_ - indices_.back());
}
private:

View File

@ -14,13 +14,16 @@
#include <fcntl.h>
#include <fmt/core.h>
#include <cstdint>
#include <filesystem>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
#include "Utils.h"
#include "Types.h"
#include "common/Json.h"
#include "mmap/Column.h"
#include "common/Consts.h"
#include "common/FieldMeta.h"
@ -178,9 +181,15 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
int64_t num_rows = storage::GetTotalNumRowsForFieldDatas(field_datas);
AssertInfo(num_rows == info.row_count,
"inconsistent field data row count with meta");
auto field_data_info = FieldDataInfo{
field_id.get(), num_rows, field_datas, load_info.mmap_dir_path};
LoadFieldData(field_id, field_data_info);
if (load_info.mmap_dir_path.empty() ||
SystemProperty::Instance().IsSystem(field_id)) {
LoadFieldData(field_id, field_data_info);
} else {
MapFieldData(field_id, field_data_info);
}
}
}
@ -251,13 +260,37 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id,
switch (data_type) {
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR: {
column = std::make_unique<VariableColumn<std::string>>(
get_segment_id(), field_meta, data_info);
auto var_column =
std::make_unique<VariableColumn<std::string>>(
num_rows, field_meta);
for (auto& data : data_info.datas) {
for (auto i = 0; i < data->get_num_rows(); i++) {
auto str = static_cast<const std::string*>(
data->RawValue(i));
var_column->Append(str->data(), str->size());
}
}
var_column->Seal();
column = std::move(var_column);
break;
}
case milvus::DataType::JSON: {
column = std::make_unique<VariableColumn<Json>>(
get_segment_id(), field_meta, data_info);
auto var_column =
std::make_unique<VariableColumn<milvus::Json>>(
num_rows, field_meta);
for (auto& data : data_info.datas) {
for (auto i = 0; i < data->get_num_rows(); i++) {
auto padded_string =
static_cast<const milvus::Json*>(
data->RawValue(i))
->data();
var_column->Append(padded_string.data(),
padded_string.size());
}
}
var_column->Seal();
column = std::move(var_column);
break;
}
default: {
}
@ -266,8 +299,11 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id,
std::unique_lock lck(mutex_);
variable_fields_.emplace(field_id, std::move(column));
} else {
auto column = Column(get_segment_id(), field_meta, data_info);
auto column = Column(num_rows, field_meta);
for (auto& data : data_info.datas) {
column.Append(static_cast<const char*>(data->Data()),
data->Size());
}
std::unique_lock lck(mutex_);
fixed_fields_.emplace(field_id, std::move(column));
}
@ -287,6 +323,106 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id,
update_row_count(num_rows);
}
void
SegmentSealedImpl::MapFieldData(const FieldId field_id,
const FieldDataInfo& data_info) {
auto filepath = std::filesystem::path(data_info.mmap_dir_path) /
std::to_string(get_segment_id()) /
std::to_string(field_id.get());
auto dir = filepath.parent_path();
std::filesystem::create_directories(dir);
int fd =
open(filepath.c_str(), O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR);
AssertInfo(fd != -1,
fmt::format("failed to create mmap file {}", filepath.c_str()));
auto& field_meta = (*schema_)[field_id];
auto data_type = field_meta.get_data_type();
// write the field data to disk
ssize_t total_written{0};
auto data_size = GetDataSize(data_info.datas);
std::vector<uint64_t> indices{};
for (auto& data : data_info.datas) {
auto written = WriteFieldData(fd, data_type, data);
if (written != data->Size()) {
break;
}
indices.emplace_back(total_written);
total_written += written;
}
AssertInfo(
total_written == data_size ||
total_written != -1 &&
datatype_is_variable(field_meta.get_data_type()),
fmt::format(
"failed to write data file {}, written {} but total {}, err: {}",
filepath.c_str(),
total_written,
data_size,
strerror(errno)));
int ok = fsync(fd);
AssertInfo(ok == 0,
fmt::format("failed to fsync mmap data file {}, err: {}",
filepath.c_str(),
strerror(errno)));
auto num_rows = data_info.row_count;
if (datatype_is_variable(data_type)) {
std::unique_ptr<ColumnBase> column{};
switch (data_type) {
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR: {
auto var_column = std::make_unique<VariableColumn<std::string>>(
fd, total_written, field_meta);
var_column->Seal(std::move(indices));
column = std::move(var_column);
break;
}
case milvus::DataType::JSON: {
auto var_column =
std::make_unique<VariableColumn<milvus::Json>>(
fd, total_written, field_meta);
var_column->Seal(std::move(indices));
column = std::move(var_column);
break;
}
default: {
}
}
std::unique_lock lck(mutex_);
variable_fields_.emplace(field_id, std::move(column));
} else {
auto column = Column(fd, total_written, field_meta);
std::unique_lock lck(mutex_);
fixed_fields_.emplace(field_id, std::move(column));
}
ok = unlink(filepath.c_str());
AssertInfo(ok == 0,
fmt::format("failed to unlink mmap data file {}, err: {}",
filepath.c_str(),
strerror(errno)));
ok = close(fd);
AssertInfo(ok == 0,
fmt::format("failed to close data file {}, err: {}",
filepath.c_str(),
strerror(errno)));
// set pks to offset
if (schema_->get_primary_field_id() == field_id) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
insert_record_.insert_pks(data_info.datas);
insert_record_.seal_pks();
}
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, true);
}
void
SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
AssertInfo(info.row_count > 0, "The row count of deleted record is 0");
@ -340,12 +476,12 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
auto element_sizeof = field_meta.get_sizeof();
if (auto it = fixed_fields_.find(field_id); it != fixed_fields_.end()) {
auto& field_data = it->second;
return field_data.span();
return field_data.Span();
}
if (auto it = variable_fields_.find(field_id);
it != variable_fields_.end()) {
auto& field = it->second;
return field->span();
return field->Span();
}
auto field_data = insert_record_.get_field_data_base(field_id);
AssertInfo(field_data->num_chunk() == 1,
@ -439,7 +575,7 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info,
auto row_count = row_count_opt_.value();
auto& vec_data = fixed_fields_.at(field_id);
query::SearchOnSealed(*schema_,
vec_data.data(),
vec_data.Data(),
search_info,
query_data,
query_count,
@ -614,7 +750,7 @@ SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column,
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
if (offset != INVALID_SEG_OFFSET) {
dst[i] = std::move(T(field->raw_at(offset)));
dst[i] = std::move(T(field->RawAt(offset)));
}
}
}
@ -706,7 +842,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
}
}
auto src_vec = fixed_fields_.at(field_id).data();
auto src_vec = fixed_fields_.at(field_id).Data();
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
FixedVector<bool> output(count);

View File

@ -55,9 +55,13 @@ class SegmentSealedImpl : public SegmentSealed {
HasIndex(FieldId field_id) const override;
bool
HasFieldData(FieldId field_id) const override;
void
LoadFieldData(FieldId field_id, const FieldDataInfo& data_info) override;
void
MapFieldData(const FieldId field_id, const FieldDataInfo& data);
int64_t
get_segment_id() const override {
return id_;

View File

@ -552,12 +552,12 @@ LoadFieldDatasFromRemote(std::vector<std::string>& remote_files) {
std::sort(remote_files.begin(),
remote_files.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
return std::stol(a.substr(a.find_last_of('/') + 1)) <
std::stol(b.substr(b.find_last_of('/') + 1));
});
auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::vector<std::string> batch_files;
std::vector<storage::FieldDataPtr> field_datas;