mirror of https://github.com/milvus-io/milvus.git
Refine field storage of sealed segment (#23464)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/23484/head
parent
4a32b842e8
commit
5aa0ddf77b
|
@ -13,7 +13,9 @@
|
|||
|
||||
#include <sys/mman.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
@ -32,13 +34,78 @@ struct Entry {
|
|||
uint32_t length;
|
||||
};
|
||||
|
||||
// Used for string/varchar field only,
|
||||
// TODO(yah01): make this generic
|
||||
class VariableField {
|
||||
class ColumnBase {
|
||||
public:
|
||||
explicit VariableField(int64_t segment_id,
|
||||
const FieldMeta& field_meta,
|
||||
const LoadFieldDataInfo& info) {
|
||||
ColumnBase() = default;
|
||||
virtual ~ColumnBase() {
|
||||
if (data_ != nullptr && data_ != MAP_FAILED) {
|
||||
if (munmap(data_, size_)) {
|
||||
AssertInfo(true,
|
||||
fmt::format("failed to unmap variable field, err={}",
|
||||
strerror(errno)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ColumnBase(ColumnBase&& column) noexcept
|
||||
: data_(column.data_), size_(column.size_) {
|
||||
column.data_ = nullptr;
|
||||
column.size_ = 0;
|
||||
}
|
||||
|
||||
const char*
|
||||
data() const {
|
||||
return data_;
|
||||
}
|
||||
|
||||
[[nodiscard]] size_t
|
||||
size() const {
|
||||
return size_;
|
||||
}
|
||||
|
||||
virtual SpanBase
|
||||
span() const = 0;
|
||||
|
||||
protected:
|
||||
char* data_{nullptr};
|
||||
uint64_t size_{0};
|
||||
};
|
||||
|
||||
class FixedColumn : public ColumnBase {
|
||||
public:
|
||||
FixedColumn(int64_t segment_id,
|
||||
const FieldMeta& field_meta,
|
||||
const LoadFieldDataInfo& info) {
|
||||
data_ = static_cast<char*>(CreateMap(segment_id, field_meta, info));
|
||||
size_ = field_meta.get_sizeof() * info.row_count;
|
||||
row_count_ = info.row_count;
|
||||
}
|
||||
|
||||
FixedColumn(FixedColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)), row_count_(column.row_count_) {
|
||||
column.row_count_ = 0;
|
||||
}
|
||||
|
||||
~FixedColumn() override = default;
|
||||
|
||||
SpanBase
|
||||
span() const override {
|
||||
return SpanBase(data_, row_count_, size_ / row_count_);
|
||||
}
|
||||
|
||||
private:
|
||||
int64_t row_count_{};
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class VariableColumn : public ColumnBase {
|
||||
public:
|
||||
using ViewType =
|
||||
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
|
||||
|
||||
VariableColumn(int64_t segment_id,
|
||||
const FieldMeta& field_meta,
|
||||
const LoadFieldDataInfo& info) {
|
||||
auto begin = info.field_data->scalars().string_data().data().begin();
|
||||
auto end = info.field_data->scalars().string_data().data().end();
|
||||
|
||||
|
@ -53,44 +120,28 @@ class VariableField {
|
|||
construct_views();
|
||||
}
|
||||
|
||||
VariableField(VariableField&& field) noexcept
|
||||
: indices_(std::move(field.indices_)),
|
||||
size_(field.size_),
|
||||
data_(field.data_),
|
||||
views_(std::move(field.views_)) {
|
||||
VariableColumn(VariableColumn&& field) noexcept
|
||||
: indices_(std::move(field.indices_)), views_(std::move(field.views_)) {
|
||||
data_ = field.data();
|
||||
size_ = field.size();
|
||||
field.data_ = nullptr;
|
||||
}
|
||||
|
||||
~VariableField() {
|
||||
if (data_ != MAP_FAILED && data_ != nullptr) {
|
||||
if (munmap(data_, size_)) {
|
||||
AssertInfo(true,
|
||||
fmt::format("failed to unmap variable field, err={}",
|
||||
strerror(errno)));
|
||||
}
|
||||
}
|
||||
~VariableColumn() override = default;
|
||||
|
||||
SpanBase
|
||||
span() const override {
|
||||
return SpanBase(views_.data(), views_.size(), sizeof(ViewType));
|
||||
}
|
||||
|
||||
char*
|
||||
data() {
|
||||
return data_;
|
||||
}
|
||||
|
||||
[[nodiscard]] const std::vector<std::string_view>&
|
||||
[[nodiscard]] const std::vector<ViewType>&
|
||||
views() const {
|
||||
return views_;
|
||||
}
|
||||
|
||||
[[nodiscard]] size_t
|
||||
size() const {
|
||||
return size_;
|
||||
}
|
||||
|
||||
Span<char>
|
||||
ViewType
|
||||
operator[](const int i) const {
|
||||
uint64_t next = (i + 1 == indices_.size()) ? size_ : indices_[i + 1];
|
||||
uint64_t offset = indices_[i];
|
||||
return Span<char>(data_ + offset, uint32_t(next - offset));
|
||||
return views_[i];
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -106,10 +157,8 @@ class VariableField {
|
|||
|
||||
private:
|
||||
std::vector<uint64_t> indices_{};
|
||||
uint64_t size_{0};
|
||||
char* data_{nullptr};
|
||||
|
||||
// Compatible with current Span type
|
||||
std::vector<std::string_view> views_{};
|
||||
std::vector<ViewType> views_{};
|
||||
};
|
||||
} // namespace milvus::segcore
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
|
||||
#include "Types.h"
|
||||
|
@ -68,6 +69,10 @@ class Span<
|
|||
: data_(data), row_count_(row_count) {
|
||||
}
|
||||
|
||||
explicit Span(std::string_view data) {
|
||||
Span(data.data(), data.size());
|
||||
}
|
||||
|
||||
operator SpanBase() const {
|
||||
return SpanBase(data_, row_count_, sizeof(T));
|
||||
}
|
||||
|
|
|
@ -15,10 +15,15 @@
|
|||
#include <fmt/core.h>
|
||||
|
||||
#include <filesystem>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
#include "Utils.h"
|
||||
#include "common/Column.h"
|
||||
#include "common/Consts.h"
|
||||
#include "common/FieldMeta.h"
|
||||
#include "common/Types.h"
|
||||
#include "query/ScalarIndex.h"
|
||||
#include "query/SearchBruteForce.h"
|
||||
#include "query/SearchOnSealed.h"
|
||||
|
@ -225,19 +230,27 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
|
|||
AssertInfo(!get_bit(index_ready_bitset_, field_id),
|
||||
"field data can't be loaded when indexing exists");
|
||||
|
||||
void* field_data = nullptr;
|
||||
size_t size = 0;
|
||||
if (datatype_is_variable(data_type)) {
|
||||
VariableField field(get_segment_id(), field_meta, info);
|
||||
size = field.size();
|
||||
field_data = reinterpret_cast<void*>(field.data());
|
||||
std::unique_ptr<ColumnBase> column{};
|
||||
switch (data_type) {
|
||||
case milvus::DataType::STRING:
|
||||
case milvus::DataType::VARCHAR: {
|
||||
column = std::make_unique<VariableColumn<std::string>>(
|
||||
get_segment_id(), field_meta, info);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
}
|
||||
}
|
||||
size = column->size();
|
||||
std::unique_lock lck(mutex_);
|
||||
variable_fields_.emplace(field_id, std::move(field));
|
||||
variable_fields_.emplace(field_id, std::move(column));
|
||||
} else {
|
||||
field_data = CreateMap(get_segment_id(), field_meta, info);
|
||||
size = field_meta.get_sizeof() * info.row_count;
|
||||
auto column = FixedColumn(get_segment_id(), field_meta, info);
|
||||
size = column.size();
|
||||
std::unique_lock lck(mutex_);
|
||||
fixed_fields_[field_id] = field_data;
|
||||
fixed_fields_.emplace(field_id, std::move(column));
|
||||
}
|
||||
|
||||
// set pks to offset
|
||||
|
@ -333,15 +346,13 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
|
|||
auto& field_meta = schema_->operator[](field_id);
|
||||
auto element_sizeof = field_meta.get_sizeof();
|
||||
if (auto it = fixed_fields_.find(field_id); it != fixed_fields_.end()) {
|
||||
auto field_data = it->second;
|
||||
return SpanBase(field_data, get_row_count(), element_sizeof);
|
||||
auto& field_data = it->second;
|
||||
return field_data.span();
|
||||
}
|
||||
if (auto it = variable_fields_.find(field_id);
|
||||
it != variable_fields_.end()) {
|
||||
auto& field = it->second;
|
||||
return SpanBase(field.views().data(),
|
||||
field.views().size(),
|
||||
sizeof(std::string_view));
|
||||
return field->span();
|
||||
}
|
||||
auto field_data = insert_record_.get_field_data_base(field_id);
|
||||
AssertInfo(field_data->num_chunk() == 1,
|
||||
|
@ -432,9 +443,9 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info,
|
|||
"Field Data is not loaded: " + std::to_string(field_id.get()));
|
||||
AssertInfo(row_count_opt_.has_value(), "Can't get row count value");
|
||||
auto row_count = row_count_opt_.value();
|
||||
auto vec_data = fixed_fields_.at(field_id);
|
||||
auto& vec_data = fixed_fields_.at(field_id);
|
||||
query::SearchOnSealed(*schema_,
|
||||
vec_data,
|
||||
vec_data.data(),
|
||||
search_info,
|
||||
query_data,
|
||||
query_count,
|
||||
|
@ -571,16 +582,16 @@ SegmentSealedImpl::bulk_subscript_impl(const void* src_raw,
|
|||
|
||||
template <typename T>
|
||||
void
|
||||
SegmentSealedImpl::bulk_subscript_impl(const VariableField& field,
|
||||
SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* dst_raw) {
|
||||
auto field = reinterpret_cast<const VariableColumn<T>*>(column);
|
||||
auto dst = reinterpret_cast<T*>(dst_raw);
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
if (offset != INVALID_SEG_OFFSET) {
|
||||
auto entry = field[offset];
|
||||
dst[i] = std::move(T(entry.data(), entry.row_count()));
|
||||
dst[i] = std::move(T((*field)[offset]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -647,10 +658,11 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
|
|||
case DataType::VARCHAR:
|
||||
case DataType::STRING: {
|
||||
FixedVector<std::string> output(count);
|
||||
bulk_subscript_impl<std::string>(variable_fields_.at(field_id),
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
bulk_subscript_impl<std::string>(
|
||||
variable_fields_.at(field_id).get(),
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
return CreateScalarDataArrayFrom(
|
||||
output.data(), count, field_meta);
|
||||
}
|
||||
|
@ -662,7 +674,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
|
|||
}
|
||||
}
|
||||
|
||||
auto src_vec = fixed_fields_.at(field_id);
|
||||
auto src_vec = fixed_fields_.at(field_id).data();
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::BOOL: {
|
||||
FixedVector<bool> output(count);
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
#include "SealedIndexingRecord.h"
|
||||
#include "SegmentSealed.h"
|
||||
#include "TimestampIndex.h"
|
||||
#include "VariableField.h"
|
||||
#include "common/Column.h"
|
||||
#include "index/ScalarIndex.h"
|
||||
#include "sys/mman.h"
|
||||
|
||||
|
@ -37,18 +37,7 @@ namespace milvus::segcore {
|
|||
class SegmentSealedImpl : public SegmentSealed {
|
||||
public:
|
||||
explicit SegmentSealedImpl(SchemaPtr schema, int64_t segment_id);
|
||||
~SegmentSealedImpl() {
|
||||
for (auto& [field_id, data] : fixed_fields_) {
|
||||
auto field_meta = schema_->operator[](field_id);
|
||||
auto data_type = field_meta.get_data_type();
|
||||
if (munmap(data, field_meta.get_sizeof() * get_row_count())) {
|
||||
AssertInfo(true,
|
||||
"failed to unmap field " +
|
||||
std::to_string(field_id.get()) +
|
||||
" err=" + strerror(errno));
|
||||
}
|
||||
}
|
||||
}
|
||||
~SegmentSealedImpl() override = default;
|
||||
void
|
||||
LoadIndex(const LoadIndexInfo& info) override;
|
||||
void
|
||||
|
@ -151,7 +140,7 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||
|
||||
template <typename T>
|
||||
static void
|
||||
bulk_subscript_impl(const VariableField& field,
|
||||
bulk_subscript_impl(const ColumnBase* field,
|
||||
const int64_t* seg_offsets,
|
||||
int64_t count,
|
||||
void* dst_raw);
|
||||
|
@ -240,8 +229,8 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||
|
||||
SchemaPtr schema_;
|
||||
int64_t id_;
|
||||
std::unordered_map<FieldId, void*> fixed_fields_;
|
||||
std::unordered_map<FieldId, VariableField> variable_fields_;
|
||||
std::unordered_map<FieldId, FixedColumn> fixed_fields_;
|
||||
std::unordered_map<FieldId, std::unique_ptr<ColumnBase>> variable_fields_;
|
||||
};
|
||||
|
||||
inline SegmentSealedPtr
|
||||
|
|
Loading…
Reference in New Issue