enhance: growing segment support mmap (#32633)

issue: https://github.com/milvus-io/milvus/issues/32984

Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
pull/33783/head
cqy123456 2024-06-18 01:42:00 -05:00 committed by GitHub
parent ec64499536
commit 32f685ff12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 2093 additions and 371 deletions

View File

@ -343,6 +343,9 @@ queryNode:
warmup: disable
mmap:
mmapEnabled: false # Enable mmap for loading data
growingMmapEnabled: false # Enable mmap for growing segment
fixedFileSizeForMmapAlloc: 4 #MB, fixed file size for mmap chunk manager to store chunk data
maxDiskUsagePercentageForMmapAlloc: 20 # max percentage of disk usage in memory mapping
lazyload:
enabled: false # Enable lazyload for loading data
waitTimeout: 30000 # max wait timeout duration in milliseconds before start to do lazyload search and retrieve

View File

@ -275,6 +275,11 @@ class Array {
return offsets_;
}
std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
}
ScalarArray
output_data() const {
ScalarArray data_array;
@ -573,6 +578,11 @@ class ArrayView {
data() const {
return data_;
}
// copy to result
std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
}
bool
is_same_array(const proto::plan::Array& arr2) const {

View File

@ -61,6 +61,9 @@ enum ErrorCode {
MetricTypeNotMatch = 2031,
DimNotMatch = 2032,
ClusterSkip = 2033,
MemAllocateFailed = 2034,
MemAllocateSizeNotMatch = 2035,
MmapError = 2036,
KnowhereError = 2100,
// timeout or cancel related.

View File

@ -68,6 +68,24 @@ template <typename T>
constexpr bool IsSparse = std::is_same_v<T, SparseFloatVector> ||
std::is_same_v<T, knowhere::sparse::SparseRow<float>>;
template <typename T>
constexpr bool IsVariableType =
std::is_same_v<T, std::string> || std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Array> || std::is_same_v<T, ArrayView> ||
std::is_same_v<T, proto::plan::Array> || std::is_same_v<T, Json> ||
IsSparse<T>;
template <typename T>
constexpr bool IsVariableTypeSupportInChunk =
std::is_same_v<T, std::string> || std::is_same_v<T, Array> ||
std::is_same_v<T, Json>;
template <typename T>
using ChunkViewType = std::conditional_t<
std::is_same_v<T, std::string>,
std::string_view,
std::conditional_t<std::is_same_v<T, Array>, ArrayView, T>>;
struct FundamentalTag {};
struct StringTag {};

View File

@ -93,6 +93,14 @@ typedef struct CStorageConfig {
int64_t requestTimeoutMs;
} CStorageConfig;
typedef struct CMmapConfig {
const char* cache_read_ahead_policy;
const char* mmap_path;
uint64_t disk_limit;
uint64_t fix_file_size;
bool growing_enable_mmap;
} CMmapConfig;
typedef struct CTraceConfig {
const char* exporter;
float sampleFraction;

View File

@ -53,7 +53,10 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
break;
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
result = ExecRangeVisitorImpl<std::string>();
} else {
result = ExecRangeVisitorImpl<std::string_view>();

View File

@ -68,7 +68,10 @@ PhyCompareFilterExpr::GetChunkData<std::string>(FieldId field_id,
};
}
}
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
auto chunk_data =
segment_->chunk_data<std::string>(field_id, chunk_id).data();
return [chunk_data](int i) -> const number { return chunk_data[i]; };

View File

@ -191,7 +191,6 @@ class SegmentExpr : public Expr {
TargetBitmapView res,
ValTypes... values) {
int64_t processed_size = 0;
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
auto data_pos =
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;

View File

@ -55,7 +55,10 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
break;
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
result = ExecVisitorImpl<std::string>();
} else {
result = ExecVisitorImpl<std::string_view>();

View File

@ -112,7 +112,10 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
break;
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
result = ExecRangeVisitorImpl<std::string>();
} else {
result = ExecRangeVisitorImpl<std::string_view>();
@ -294,7 +297,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) {
// filtering by index, get candidates.
auto size_per_chunk = segment_->size_per_chunk();
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto{
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =

View File

@ -0,0 +1,195 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "common/Array.h"
#include "storage/MmapManager.h"
namespace milvus {
/**
* @brief FixedLengthChunk
*/
template <typename Type>
struct FixedLengthChunk {
public:
FixedLengthChunk() = delete;
explicit FixedLengthChunk(const uint64_t size,
storage::MmapChunkDescriptor descriptor)
: mmap_descriptor_(descriptor), size_(size) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
data_ = (Type*)(mcm->Allocate(mmap_descriptor_, sizeof(Type) * size));
AssertInfo(data_ != nullptr,
"failed to create a mmapchunk: {}, map_size");
};
void*
data() {
return data_;
};
size_t
size() {
return size_;
};
Type
get(const int i) const {
return data_[i];
}
const Type&
view(const int i) const {
return data_[i];
}
private:
int64_t size_ = 0;
Type* data_ = nullptr;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
};
/**
* @brief VariableLengthChunk
*/
template <typename Type>
struct VariableLengthChunk {
static_assert(IsVariableTypeSupportInChunk<Type>);
public:
VariableLengthChunk() = delete;
explicit VariableLengthChunk(const uint64_t size,
storage::MmapChunkDescriptor descriptor)
: mmap_descriptor_(descriptor), size_(size) {
data_ = FixedVector<ChunkViewType<Type>>(size);
};
inline void
set(const Type* src, uint32_t begin, uint32_t length) {
throw std::runtime_error(
"set should be a template specialization function");
}
inline Type
get(const int i) const {
throw std::runtime_error(
"get should be a template specialization function");
}
const ChunkViewType<Type>&
view(const int i) const {
return data_[i];
}
const ChunkViewType<Type>&
operator[](const int i) const {
return view(i);
}
void*
data() {
return data_.data();
};
size_t
size() {
return size_;
};
private:
int64_t size_ = 0;
FixedVector<ChunkViewType<Type>> data_;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
};
template <>
inline void
VariableLengthChunk<std::string>::set(const std::string* src,
uint32_t begin,
uint32_t length) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
milvus::ErrorCode err_code;
AssertInfo(
begin + length <= size_,
"failed to set a chunk with length: {} from beign {}, map_size={}",
length,
begin,
size_);
for (auto i = 0; i < length; i++) {
auto buf_size = src[i].size() + 1;
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
AssertInfo(buf != nullptr,
"failed to allocate memory from mmap_manager, error_code");
std::strcpy(buf, src[i].c_str());
data_[i + begin] = std::string_view(buf, src[i].size());
}
}
template <>
inline std::string
VariableLengthChunk<std::string>::get(const int i) const {
// copy to a string
return std::string(data_[i]);
}
template <>
inline void
VariableLengthChunk<Json>::set(const Json* src,
uint32_t begin,
uint32_t length) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
milvus::ErrorCode err_code;
AssertInfo(
begin + length <= size_,
"failed to set a chunk with length: {} from beign {}, map_size={}",
length,
begin,
size_);
for (auto i = 0; i < length; i++) {
auto buf_size = src[i].size() + simdjson::SIMDJSON_PADDING + 1;
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
AssertInfo(
buf != nullptr,
"failed to allocate memory from mmap_manager, error_code:{}");
std::strcpy(buf, src[i].c_str());
data_[i + begin] = Json(buf, src[i].size());
}
}
template <>
inline Json
VariableLengthChunk<Json>::get(const int i) const {
return std::move(Json(simdjson::padded_string(data_[i].data())));
}
template <>
inline void
VariableLengthChunk<Array>::set(const Array* src,
uint32_t begin,
uint32_t length) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
milvus::ErrorCode err_code;
AssertInfo(
begin + length <= size_,
"failed to set a chunk with length: {} from beign {}, map_size={}",
length,
begin,
size_);
for (auto i = 0; i < length; i++) {
auto array_data =
(char*)mcm->Allocate(mmap_descriptor_, src[i].byte_size());
AssertInfo(array_data != nullptr,
"failed to allocate memory from mmap_manager, error_code");
std::copy(
src[i].data(), src[i].data() + src[i].byte_size(), array_data);
data_[i + begin] = ArrayView(array_data,
src[i].byte_size(),
src[i].get_element_type(),
src[i].get_offsets_in_copy());
}
}
template <>
inline Array
VariableLengthChunk<Array>::get(const int i) const {
auto array_view_i = data_[i];
char* data = static_cast<char*>(const_cast<void*>(array_view_i.data()));
return Array(data,
array_view_i.byte_size(),
array_view_i.get_element_type(),
array_view_i.get_offsets_in_copy());
}
} // namespace milvus

View File

@ -0,0 +1,212 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "mmap/ChunkData.h"
#include "storage/MmapManager.h"
namespace milvus {
template <typename Type>
class ChunkVectorBase {
public:
virtual ~ChunkVectorBase() = default;
virtual void
emplace_to_at_least(int64_t chunk_num, int64_t chunk_size) = 0;
virtual void
copy_to_chunk(int64_t chunk_id,
int64_t offest,
const Type* data,
int64_t length) = 0;
virtual void*
get_chunk_data(int64_t index) = 0;
virtual int64_t
get_chunk_size(int64_t index) = 0;
virtual Type
get_element(int64_t chunk_id, int64_t chunk_offset) = 0;
virtual ChunkViewType<Type>
view_element(int64_t chunk_id, int64_t chunk_offset) = 0;
int64_t
size() const {
return counter_;
}
virtual void
clear() = 0;
virtual SpanBase
get_span(int64_t chunk_id) = 0;
protected:
std::atomic<int64_t> counter_ = 0;
};
template <typename Type>
using ChunkVectorPtr = std::unique_ptr<ChunkVectorBase<Type>>;
template <typename Type,
typename ChunkImpl = FixedVector<Type>,
bool IsMmap = false>
class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
public:
ThreadSafeChunkVector(storage::MmapChunkDescriptor descriptor = nullptr) {
mmap_descriptor_ = descriptor;
}
void
emplace_to_at_least(int64_t chunk_num, int64_t chunk_size) override {
std::unique_lock<std::shared_mutex> lck(this->mutex_);
if (chunk_num <= this->counter_) {
return;
}
while (vec_.size() < chunk_num) {
if constexpr (IsMmap) {
vec_.emplace_back(chunk_size, mmap_descriptor_);
} else {
vec_.emplace_back(chunk_size);
}
++this->counter_;
}
}
void
copy_to_chunk(int64_t chunk_id,
int64_t offset,
const Type* data,
int64_t length) override {
std::unique_lock<std::shared_mutex> lck(mutex_);
AssertInfo(chunk_id < this->counter_,
fmt::format("index out of range, index={}, counter_={}",
chunk_id,
this->counter_));
if constexpr (!IsMmap || !IsVariableType<Type>) {
auto ptr = (Type*)vec_[chunk_id].data();
AssertInfo(
offset + length <= vec_[chunk_id].size(),
fmt::format(
"index out of chunk range, offset={}, length={}, size={}",
offset,
length,
vec_[chunk_id].size()));
std::copy_n(data, length, ptr + offset);
} else {
vec_[chunk_id].set(data, offset, length);
}
}
Type
get_element(int64_t chunk_id, int64_t chunk_offset) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
auto chunk = vec_[chunk_id];
AssertInfo(
chunk_id < this->counter_ && chunk_offset < chunk.size(),
fmt::format("index out of range, index={}, chunk_offset={}, cap={}",
chunk_id,
chunk_offset,
chunk.size()));
if constexpr (IsMmap) {
return chunk.get(chunk_offset);
} else {
return chunk[chunk_offset];
}
}
ChunkViewType<Type>
view_element(int64_t chunk_id, int64_t chunk_offset) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
auto chunk = vec_[chunk_id];
if constexpr (IsMmap) {
return chunk.view(chunk_offset);
} else if constexpr (std::is_same_v<std::string, Type>) {
return std::string_view(chunk[chunk_offset].data(),
chunk[chunk_offset].size());
} else if constexpr (std::is_same_v<Array, Type>) {
auto& src = chunk[chunk_offset];
return ArrayView(const_cast<char*>(src.data()),
src.byte_size(),
src.get_element_type(),
src.get_offsets_in_copy());
} else {
return chunk[chunk_offset];
}
}
void*
get_chunk_data(int64_t index) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
AssertInfo(index < this->counter_,
fmt::format("index out of range, index={}, counter_={}",
index,
this->counter_));
return vec_[index].data();
}
int64_t
get_chunk_size(int64_t index) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
AssertInfo(index < this->counter_,
fmt::format("index out of range, index={}, counter_={}",
index,
this->counter_));
return vec_[index].size();
}
void
clear() override {
std::unique_lock<std::shared_mutex> lck(mutex_);
this->counter_ = 0;
vec_.clear();
}
SpanBase
get_span(int64_t chunk_id) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
if constexpr (IsMmap && std::is_same_v<std::string, Type>) {
return SpanBase(get_chunk_data(chunk_id),
get_chunk_size(chunk_id),
sizeof(ChunkViewType<Type>));
} else {
return SpanBase(get_chunk_data(chunk_id),
get_chunk_size(chunk_id),
sizeof(Type));
}
}
private:
mutable std::shared_mutex mutex_;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
std::deque<ChunkImpl> vec_;
};
template <typename Type>
ChunkVectorPtr<Type>
SelectChunkVectorPtr(storage::MmapChunkDescriptor& mmap_descriptor) {
if constexpr (!IsVariableType<Type>) {
if (mmap_descriptor != nullptr) {
return std::make_unique<
ThreadSafeChunkVector<Type, FixedLengthChunk<Type>, true>>(
mmap_descriptor);
} else {
return std::make_unique<ThreadSafeChunkVector<Type>>();
}
} else if constexpr (IsVariableTypeSupportInChunk<Type>) {
if (mmap_descriptor != nullptr) {
return std::make_unique<
ThreadSafeChunkVector<Type, VariableLengthChunk<Type>, true>>(
mmap_descriptor);
} else {
return std::make_unique<ThreadSafeChunkVector<Type>>();
}
} else {
// todo: sparse float vector support mmap
return std::make_unique<ThreadSafeChunkVector<Type>>();
}
}
} // namespace milvus

View File

@ -39,6 +39,7 @@
#include "common/Array.h"
#include "knowhere/dataset.h"
#include "storage/prometheus_client.h"
#include "storage/MmapChunkManager.h"
namespace milvus {
@ -53,12 +54,17 @@ constexpr size_t ARRAY_PADDING = 1;
class ColumnBase {
public:
enum MappingType {
MAP_WITH_ANONYMOUS = 0,
MAP_WITH_FILE = 1,
MAP_WITH_MANAGER = 2,
};
// memory mode ctor
ColumnBase(size_t reserve, const FieldMeta& field_meta)
: type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type())
? 1
: field_meta.get_sizeof()),
is_map_anonymous_(true) {
mapping_type_(MappingType::MAP_WITH_ANONYMOUS) {
SetPaddingSize(field_meta.get_data_type());
if (IsVariableDataType(field_meta.get_data_type())) {
@ -83,13 +89,38 @@ class ColumnBase {
UpdateMetricWhenMmap(mapped_size);
}
// use mmap manager ctor, used in growing segment fixed data type
ColumnBase(size_t reserve,
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
: mcm_(mcm),
mmap_descriptor_(descriptor),
type_size_(GetDataTypeSize(data_type, dim)),
num_rows_(0),
size_(0),
cap_size_(reserve),
mapping_type_(MAP_WITH_MANAGER) {
AssertInfo((mcm != nullptr) && descriptor != nullptr,
"use wrong mmap chunk manager and mmap chunk descriptor to "
"create column.");
SetPaddingSize(data_type);
size_t mapped_size = cap_size_ + padding_;
data_ = (char*)mcm_->Allocate(mmap_descriptor_, (uint64_t)mapped_size);
AssertInfo(data_ != nullptr,
"fail to create with mmap manager: map_size = {}",
mapped_size);
}
// mmap mode ctor
// User must call Seal to build the view for variable length column.
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
: type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type())
? 1
: field_meta.get_sizeof()),
is_map_anonymous_(false),
mapping_type_(MappingType::MAP_WITH_FILE),
num_rows_(size / type_size_) {
SetPaddingSize(field_meta.get_data_type());
@ -119,7 +150,7 @@ class ColumnBase {
IsSparseFloatVectorDataType(data_type) ? 1 : (size / type_size_)),
size_(size),
cap_size_(size),
is_map_anonymous_(false) {
mapping_type_(MappingType::MAP_WITH_FILE) {
SetPaddingSize(data_type);
size_t mapped_size = cap_size_ + padding_;
@ -134,13 +165,15 @@ class ColumnBase {
virtual ~ColumnBase() {
if (data_ != nullptr) {
if (mapping_type_ != MappingType::MAP_WITH_MANAGER) {
size_t mapped_size = cap_size_ + padding_;
if (munmap(data_, mapped_size)) {
AssertInfo(true,
"failed to unmap variable field, err={}",
strerror(errno));
}
UpdateMetricWhenMunmap(mapped_size);
}
UpdateMetricWhenMunmap(cap_size_ + padding_);
}
}
@ -238,13 +271,17 @@ class ColumnBase {
}
protected:
// only for memory mode, not mmap
// only for memory mode and mmap manager mode, not mmap
void
Expand(size_t new_size) {
if (new_size == 0) {
return;
}
AssertInfo(
mapping_type_ == MappingType::MAP_WITH_ANONYMOUS ||
mapping_type_ == MappingType::MAP_WITH_MANAGER,
"expand function only use in anonymous or with mmap manager");
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
size_t new_mapped_size = new_size + padding_;
auto data = static_cast<char*>(mmap(nullptr,
new_mapped_size,
@ -278,7 +315,19 @@ class ColumnBase {
data_ = data;
cap_size_ = new_size;
is_map_anonymous_ = true;
mapping_type_ = MappingType::MAP_WITH_ANONYMOUS;
} else if (mapping_type_ == MappingType::MAP_WITH_MANAGER) {
size_t new_mapped_size = new_size + padding_;
auto data = mcm_->Allocate(mmap_descriptor_, new_mapped_size);
AssertInfo(data != nullptr,
"fail to create with mmap manager: map_size = {}",
new_mapped_size);
std::memcpy(data, data_, size_);
// allocate space only append in one growing segment, so no need to munmap()
data_ = (char*)data;
cap_size_ = new_size;
mapping_type_ = MappingType::MAP_WITH_MANAGER;
}
}
char* data_{nullptr};
@ -291,16 +340,17 @@ class ColumnBase {
// length in bytes
size_t size_{0};
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
private:
void
UpdateMetricWhenMmap(size_t mmaped_size) {
UpdateMetricWhenMmap(is_map_anonymous_, mmaped_size);
UpdateMetricWhenMmap(mapping_type_, mmaped_size);
}
void
UpdateMetricWhenMmap(bool is_map_anonymous, size_t mapped_size) {
if (is_map_anonymous) {
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
milvus::storage::internal_mmap_allocated_space_bytes_anon.Observe(
mapped_size);
milvus::storage::internal_mmap_in_used_space_bytes_anon.Increment(
@ -315,7 +365,7 @@ class ColumnBase {
void
UpdateMetricWhenMunmap(size_t mapped_size) {
if (is_map_anonymous_) {
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
milvus::storage::internal_mmap_in_used_space_bytes_anon.Decrement(
mapped_size);
} else {
@ -325,8 +375,9 @@ class ColumnBase {
}
private:
// is MAP_ANONYMOUS
bool is_map_anonymous_;
// mapping_type_
MappingType mapping_type_;
storage::MmapChunkManagerPtr mcm_ = nullptr;
};
class Column : public ColumnBase {
@ -346,6 +397,14 @@ class Column : public ColumnBase {
: ColumnBase(file, size, dim, data_type) {
}
Column(size_t reserve,
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}
Column(Column&& column) noexcept : ColumnBase(std::move(column)) {
}
@ -376,6 +435,14 @@ class SparseFloatColumn : public ColumnBase {
const DataType& data_type)
: ColumnBase(file, size, dim, data_type) {
}
// mmap with mmap manager
SparseFloatColumn(size_t reserve,
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}
SparseFloatColumn(SparseFloatColumn&& column) noexcept
: ColumnBase(std::move(column)),
@ -471,6 +538,14 @@ class VariableColumn : public ColumnBase {
VariableColumn(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
}
// mmap with mmap manager
VariableColumn(size_t reserve,
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}
VariableColumn(VariableColumn&& column) noexcept
: ColumnBase(std::move(column)),
@ -579,6 +654,14 @@ class ArrayColumn : public ColumnBase {
element_type_(field_meta.get_element_type()) {
}
ArrayColumn(size_t reserve,
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}
ArrayColumn(ArrayColumn&& column) noexcept
: ColumnBase(std::move(column)),
indices_(std::move(column.indices_)),

View File

@ -2378,7 +2378,10 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
}
case DataType::VARCHAR: {
if (chunk_id < data_barrier) {
if (segment_.type() == SegmentType::Growing) {
if (segment_.type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
auto chunk_data =
segment_
.chunk_data<std::string>(field_id, chunk_id)

View File

@ -32,6 +32,7 @@
#include "common/Span.h"
#include "common/Types.h"
#include "common/Utils.h"
#include "mmap/ChunkVector.h"
namespace milvus::segcore {
@ -124,6 +125,9 @@ class VectorBase {
virtual const void*
get_chunk_data(ssize_t chunk_index) const = 0;
virtual int64_t
get_chunk_size(ssize_t chunk_index) const = 0;
virtual ssize_t
num_chunk() const = 0;
@ -140,8 +144,6 @@ class VectorBase {
template <typename Type, bool is_type_entire_row = false>
class ConcurrentVectorImpl : public VectorBase {
public:
// constants
using Chunk = FixedVector<Type>;
ConcurrentVectorImpl(ConcurrentVectorImpl&&) = delete;
ConcurrentVectorImpl(const ConcurrentVectorImpl&) = delete;
@ -164,43 +166,42 @@ class ConcurrentVectorImpl : public VectorBase {
BinaryVector>>>>;
public:
explicit ConcurrentVectorImpl(ssize_t elements_per_row,
int64_t size_per_chunk)
explicit ConcurrentVectorImpl(
ssize_t elements_per_row,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: VectorBase(size_per_chunk),
elements_per_row_(is_type_entire_row ? 1 : elements_per_row) {
chunks_ptr_ = SelectChunkVectorPtr<Type>(mmap_descriptor);
}
Span<TraitType>
get_span(int64_t chunk_id) const {
auto& chunk = get_chunk(chunk_id);
SpanBase
get_span_base(int64_t chunk_id) const override {
if constexpr (is_type_entire_row) {
return Span<TraitType>(chunk.data(), chunk.size());
return chunks_ptr_->get_span(chunk_id);
} else if constexpr (std::is_same_v<Type, int64_t> || // NOLINT
std::is_same_v<Type, int>) {
// only for testing
PanicInfo(NotImplemented, "unimplemented");
} else {
auto chunk_data = chunks_ptr_->get_chunk_data(chunk_id);
auto chunk_size = chunks_ptr_->get_chunk_size(chunk_id);
static_assert(
std::is_same_v<typename TraitType::embedded_type, Type>);
return Span<TraitType>(
chunk.data(), chunk.size(), elements_per_row_);
static_cast<Type*>(chunk_data), chunk_size, elements_per_row_);
}
}
SpanBase
get_span_base(int64_t chunk_id) const override {
return get_span(chunk_id);
}
void
fill_chunk_data(const std::vector<FieldDataPtr>& datas) override {
AssertInfo(chunks_.size() == 0, "non empty concurrent vector");
AssertInfo(chunks_ptr_->size() == 0, "non empty concurrent vector");
int64_t element_count = 0;
for (auto& field_data : datas) {
element_count += field_data->get_num_rows();
}
chunks_.emplace_to_at_least(1, elements_per_row_ * element_count);
chunks_ptr_->emplace_to_at_least(1, elements_per_row_ * element_count);
int64_t offset = 0;
for (auto& field_data : datas) {
auto num_rows = field_data->get_num_rows();
@ -227,26 +228,21 @@ class ConcurrentVectorImpl : public VectorBase {
if (element_count == 0) {
return;
}
chunks_.emplace_to_at_least(
chunks_ptr_->emplace_to_at_least(
upper_div(element_offset + element_count, size_per_chunk_),
elements_per_row_ * size_per_chunk_);
set_data(
element_offset, static_cast<const Type*>(source), element_count);
}
const Chunk&
get_chunk(ssize_t chunk_index) const {
return chunks_[chunk_index];
}
Chunk&
get_chunk(ssize_t index) {
return chunks_[index];
}
const void*
get_chunk_data(ssize_t chunk_index) const override {
return chunks_[chunk_index].data();
return (const void*)chunks_ptr_->get_chunk_data(chunk_index);
}
int64_t
get_chunk_size(ssize_t chunk_index) const override {
return chunks_ptr_->get_chunk_size(chunk_index);
}
// just for fun, don't use it directly
@ -254,7 +250,9 @@ class ConcurrentVectorImpl : public VectorBase {
get_element(ssize_t element_index) const {
auto chunk_id = element_index / size_per_chunk_;
auto chunk_offset = element_index % size_per_chunk_;
return get_chunk(chunk_id).data() + chunk_offset * elements_per_row_;
auto data =
static_cast<const Type*>(chunks_ptr_->get_chunk_data(chunk_id));
return data + chunk_offset * elements_per_row_;
}
const Type&
@ -266,18 +264,20 @@ class ConcurrentVectorImpl : public VectorBase {
elements_per_row_));
auto chunk_id = element_index / size_per_chunk_;
auto chunk_offset = element_index % size_per_chunk_;
return get_chunk(chunk_id)[chunk_offset];
auto data =
static_cast<const Type*>(chunks_ptr_->get_chunk_data(chunk_id));
return data[chunk_offset];
}
ssize_t
num_chunk() const override {
return chunks_.size();
return chunks_ptr_->size();
}
bool
empty() override {
for (size_t i = 0; i < chunks_.size(); i++) {
if (get_chunk(i).size() > 0) {
for (size_t i = 0; i < chunks_ptr_->size(); i++) {
if (chunks_ptr_->get_chunk_size(i) > 0) {
return false;
}
}
@ -287,7 +287,7 @@ class ConcurrentVectorImpl : public VectorBase {
void
clear() override {
chunks_.clear();
chunks_ptr_->clear();
}
private:
@ -336,33 +336,88 @@ class ConcurrentVectorImpl : public VectorBase {
if (element_count <= 0) {
return;
}
auto chunk_num = chunks_.size();
auto chunk_num = chunks_ptr_->size();
AssertInfo(
chunk_id < chunk_num,
fmt::format("chunk_id out of chunk num, chunk_id={}, chunk_num={}",
chunk_id,
chunk_num));
Chunk& chunk = chunks_[chunk_id];
auto ptr = chunk.data();
std::copy_n(source + source_offset * elements_per_row_,
element_count * elements_per_row_,
ptr + chunk_offset * elements_per_row_);
chunks_ptr_->copy_to_chunk(chunk_id,
chunk_offset * elements_per_row_,
source + source_offset * elements_per_row_,
element_count * elements_per_row_);
}
protected:
const ssize_t elements_per_row_;
private:
ThreadSafeVector<Chunk> chunks_;
ChunkVectorPtr<Type> chunks_ptr_ = nullptr;
};
template <typename Type>
class ConcurrentVector : public ConcurrentVectorImpl<Type, true> {
public:
static_assert(IsScalar<Type> || std::is_same_v<Type, PkType>);
explicit ConcurrentVector(int64_t size_per_chunk)
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<Type, true>::ConcurrentVectorImpl(
1, size_per_chunk) {
1, size_per_chunk, mmap_descriptor) {
}
};
template <>
class ConcurrentVector<std::string>
: public ConcurrentVectorImpl<std::string, true> {
public:
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<std::string, true>::ConcurrentVectorImpl(
1, size_per_chunk, mmap_descriptor) {
}
std::string_view
view_element(ssize_t element_index) const {
auto chunk_id = element_index / size_per_chunk_;
auto chunk_offset = element_index % size_per_chunk_;
return chunks_ptr_->view_element(chunk_id, chunk_offset);
}
};
template <>
class ConcurrentVector<Json> : public ConcurrentVectorImpl<Json, true> {
public:
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<Json, true>::ConcurrentVectorImpl(
1, size_per_chunk, mmap_descriptor) {
}
std::string_view
view_element(ssize_t element_index) const {
auto chunk_id = element_index / size_per_chunk_;
auto chunk_offset = element_index % size_per_chunk_;
return std::string_view(
chunks_ptr_->view_element(chunk_id, chunk_offset).data());
}
};
template <>
class ConcurrentVector<Array> : public ConcurrentVectorImpl<Array, true> {
public:
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<Array, true>::ConcurrentVectorImpl(
1, size_per_chunk, mmap_descriptor) {
}
ArrayView
view_element(ssize_t element_index) const {
auto chunk_id = element_index / size_per_chunk_;
auto chunk_offset = element_index % size_per_chunk_;
return chunks_ptr_->view_element(chunk_id, chunk_offset);
}
};
@ -370,9 +425,13 @@ template <>
class ConcurrentVector<SparseFloatVector>
: public ConcurrentVectorImpl<knowhere::sparse::SparseRow<float>, true> {
public:
explicit ConcurrentVector(int64_t size_per_chunk)
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<knowhere::sparse::SparseRow<float>,
true>::ConcurrentVectorImpl(1, size_per_chunk),
true>::ConcurrentVectorImpl(1,
size_per_chunk,
mmap_descriptor),
dim_(0) {
}
@ -404,9 +463,11 @@ template <>
class ConcurrentVector<FloatVector>
: public ConcurrentVectorImpl<float, false> {
public:
ConcurrentVector(int64_t dim, int64_t size_per_chunk)
ConcurrentVector(int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<float, false>::ConcurrentVectorImpl(
dim, size_per_chunk) {
dim, size_per_chunk, mmap_descriptor) {
}
};
@ -414,8 +475,11 @@ template <>
class ConcurrentVector<BinaryVector>
: public ConcurrentVectorImpl<uint8_t, false> {
public:
explicit ConcurrentVector(int64_t dim, int64_t size_per_chunk)
: ConcurrentVectorImpl(dim / 8, size_per_chunk) {
explicit ConcurrentVector(
int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl(dim / 8, size_per_chunk, mmap_descriptor) {
AssertInfo(dim % 8 == 0,
fmt::format("dim is not a multiple of 8, dim={}", dim));
}
@ -425,9 +489,11 @@ template <>
class ConcurrentVector<Float16Vector>
: public ConcurrentVectorImpl<float16, false> {
public:
ConcurrentVector(int64_t dim, int64_t size_per_chunk)
ConcurrentVector(int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<float16, false>::ConcurrentVectorImpl(
dim, size_per_chunk) {
dim, size_per_chunk, mmap_descriptor) {
}
};
@ -435,9 +501,11 @@ template <>
class ConcurrentVector<BFloat16Vector>
: public ConcurrentVectorImpl<bfloat16, false> {
public:
ConcurrentVector(int64_t dim, int64_t size_per_chunk)
ConcurrentVector(int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: ConcurrentVectorImpl<bfloat16, false>::ConcurrentVectorImpl(
dim, size_per_chunk) {
dim, size_per_chunk, mmap_descriptor) {
}
};

View File

@ -65,13 +65,13 @@ VectorFieldIndexing::BuildIndexRange(int64_t ack_beg,
auto conf = get_build_params();
data_.grow_to_at_least(ack_end);
for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) {
const auto& chunk = source->get_chunk(chunk_id);
const auto& chunk_data = source->get_chunk_data(chunk_id);
auto indexing = std::make_unique<index::VectorMemIndex<float>>(
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT,
knowhere::metric::L2,
knowhere::Version::GetCurrentVersion().VersionNumber());
auto dataset = knowhere::GenDataSet(
source->get_size_per_chunk(), dim, chunk.data());
auto dataset =
knowhere::GenDataSet(source->get_size_per_chunk(), dim, chunk_data);
indexing->BuildWithDataset(dataset, conf);
data_[chunk_id] = std::move(indexing);
}
@ -297,16 +297,18 @@ ScalarFieldIndexing<T>::BuildIndexRange(int64_t ack_beg,
AssertInfo(ack_end <= num_chunk, "Ack_end is bigger than num_chunk");
data_.grow_to_at_least(ack_end);
for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) {
const auto& chunk = source->get_chunk(chunk_id);
auto chunk_data = source->get_chunk_data(chunk_id);
// build index for chunk
// TODO
if constexpr (std::is_same_v<T, std::string>) {
auto indexing = index::CreateStringIndexSort();
indexing->Build(vec_base->get_size_per_chunk(), chunk.data());
indexing->Build(vec_base->get_size_per_chunk(),
static_cast<const T*>(chunk_data));
data_[chunk_id] = std::move(indexing);
} else {
auto indexing = index::CreateScalarIndexSort<T>();
indexing->Build(vec_base->get_size_per_chunk(), chunk.data());
indexing->Build(vec_base->get_size_per_chunk(),
static_cast<const T*>(chunk_data));
data_[chunk_id] = std::move(indexing);
}
}

View File

@ -30,6 +30,7 @@
#include "segcore/AckResponder.h"
#include "segcore/ConcurrentVector.h"
#include "segcore/Record.h"
#include "storage/MmapManager.h"
namespace milvus::segcore {
@ -292,8 +293,10 @@ class OffsetOrderedArray : public OffsetMap {
template <bool is_sealed = false>
struct InsertRecord {
InsertRecord(const Schema& schema, int64_t size_per_chunk)
: timestamps_(size_per_chunk) {
InsertRecord(const Schema& schema,
const int64_t size_per_chunk,
const storage::MmapChunkDescriptor mmap_descriptor = nullptr)
: timestamps_(size_per_chunk), mmap_descriptor_(mmap_descriptor) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
for (auto& field : schema) {
@ -303,7 +306,7 @@ struct InsertRecord {
pk_field_id.value() == field_id) {
switch (field_meta.get_data_type()) {
case DataType::INT64: {
if (is_sealed) {
if constexpr (is_sealed) {
pk2offset_ =
std::make_unique<OffsetOrderedArray<int64_t>>();
} else {
@ -313,7 +316,7 @@ struct InsertRecord {
break;
}
case DataType::VARCHAR: {
if (is_sealed) {
if constexpr (is_sealed) {
pk2offset_ = std::make_unique<
OffsetOrderedArray<std::string>>();
} else {
@ -532,6 +535,9 @@ struct InsertRecord {
AssertInfo(fields_data_.find(field_id) != fields_data_.end(),
"Cannot find field_data with field_id: " +
std::to_string(field_id.get()));
AssertInfo(
fields_data_.at(field_id) != nullptr,
"fields_data_ at i is null" + std::to_string(field_id.get()));
return fields_data_.at(field_id).get();
}
@ -560,8 +566,9 @@ struct InsertRecord {
void
append_field_data(FieldId field_id, int64_t size_per_chunk) {
static_assert(IsScalar<Type> || IsSparse<Type>);
fields_data_.emplace(
field_id, std::make_unique<ConcurrentVector<Type>>(size_per_chunk));
fields_data_.emplace(field_id,
std::make_unique<ConcurrentVector<Type>>(
size_per_chunk, mmap_descriptor_));
}
// append a column of vector type
@ -571,7 +578,7 @@ struct InsertRecord {
static_assert(std::is_base_of_v<VectorTrait, VectorType>);
fields_data_.emplace(field_id,
std::make_unique<ConcurrentVector<VectorType>>(
dim, size_per_chunk));
dim, size_per_chunk, mmap_descriptor_));
}
void
@ -620,6 +627,7 @@ struct InsertRecord {
private:
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> fields_data_{};
mutable std::shared_mutex shared_mutex_{};
storage::MmapChunkDescriptor mmap_descriptor_;
};
} // namespace milvus::segcore

View File

@ -663,8 +663,12 @@ SegmentGrowingImpl::bulk_subscript_ptr_impl(
auto& src = *vec;
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
if (IsVariableTypeSupportInChunk<S> && mmap_descriptor_ != nullptr) {
dst->at(i) = std::move(T(src.view_element(offset)));
} else {
dst->at(i) = std::move(T(src[offset]));
}
}
}
template <typename T>

View File

@ -209,12 +209,35 @@ class SegmentGrowingImpl : public SegmentGrowing {
IndexMetaPtr indexMeta,
const SegcoreConfig& segcore_config,
int64_t segment_id)
: segcore_config_(segcore_config),
: mmap_descriptor_(storage::MmapManager::GetInstance()
.GetMmapConfig()
.GetEnableGrowingMmap()
? storage::MmapChunkDescriptor(
new storage::MmapChunkDescriptorValue(
{segment_id, SegmentType::Growing}))
: nullptr),
segcore_config_(segcore_config),
schema_(std::move(schema)),
index_meta_(indexMeta),
insert_record_(*schema_, segcore_config.get_chunk_rows()),
insert_record_(
*schema_, segcore_config.get_chunk_rows(), mmap_descriptor_),
indexing_record_(*schema_, index_meta_, segcore_config_),
id_(segment_id) {
if (mmap_descriptor_ != nullptr) {
LOG_INFO("growing segment {} use mmap to hold raw data",
this->get_segment_id());
auto mcm =
storage::MmapManager::GetInstance().GetMmapChunkManager();
mcm->Register(mmap_descriptor_);
}
}
~SegmentGrowingImpl() {
if (mmap_descriptor_ != nullptr) {
auto mcm =
storage::MmapManager::GetInstance().GetMmapChunkManager();
mcm->UnRegister(mmap_descriptor_);
}
}
void
@ -294,6 +317,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
}
private:
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
SegcoreConfig segcore_config_;
SchemaPtr schema_;
IndexMetaPtr index_meta_;

View File

@ -48,7 +48,7 @@
#include "query/SearchOnSealed.h"
#include "storage/Util.h"
#include "storage/ThreadPools.h"
#include "storage/ChunkCacheSingleton.h"
#include "storage/MmapManager.h"
namespace milvus::segcore {
@ -151,9 +151,9 @@ SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) {
id_);
auto field_info = it->second;
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
for (const auto& data_path : field_info.insert_files) {
auto column = cc->Read(data_path);
auto column = cc->Read(data_path, mmap_descriptor_);
}
}
@ -819,8 +819,10 @@ SegmentSealedImpl::GetFieldDataPath(FieldId field_id, int64_t offset) const {
}
std::tuple<std::string, std::shared_ptr<ColumnBase>> static ReadFromChunkCache(
const storage::ChunkCachePtr& cc, const std::string& data_path) {
auto column = cc->Read(data_path);
const storage::ChunkCachePtr& cc,
const std::string& data_path,
const storage::MmapChunkDescriptor& descriptor) {
auto column = cc->Read(data_path, descriptor);
cc->Prefetch(data_path);
return {data_path, column};
}
@ -864,7 +866,7 @@ SegmentSealedImpl::get_vector(FieldId field_id,
}
// If index doesn't have raw data, get vector from chunk cache.
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
// group by data_path
auto id_to_data_path =
@ -885,7 +887,8 @@ SegmentSealedImpl::get_vector(FieldId field_id,
futures.reserve(path_to_column.size());
for (const auto& iter : path_to_column) {
const auto& data_path = iter.first;
futures.emplace_back(pool.Submit(ReadFromChunkCache, cc, data_path));
futures.emplace_back(
pool.Submit(ReadFromChunkCache, cc, data_path, mmap_descriptor_));
}
for (int i = 0; i < futures.size(); ++i) {
@ -1029,10 +1032,15 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptorValue>(
new storage::MmapChunkDescriptorValue(
{segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
mcm->Register(mmap_descriptor_);
}
SegmentSealedImpl::~SegmentSealedImpl() {
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
if (cc == nullptr) {
return;
}
@ -1042,6 +1050,10 @@ SegmentSealedImpl::~SegmentSealedImpl() {
cc->Remove(binlog);
}
}
if (mmap_descriptor_ != nullptr) {
auto mm = storage::MmapManager::GetInstance().GetMmapChunkManager();
mm->UnRegister(mmap_descriptor_);
}
}
void
@ -1161,7 +1173,7 @@ SegmentSealedImpl::ClearData() {
variable_fields_avg_size_.clear();
stats_.mem_size = 0;
}
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
if (cc == nullptr) {
return;
}
@ -1674,7 +1686,7 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
}
void
SegmentSealedImpl::RemoveFieldFile(const FieldId field_id) {
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
if (cc == nullptr) {
return;
}

View File

@ -279,6 +279,8 @@ class SegmentSealedImpl : public SegmentSealed {
generate_interim_index(const FieldId field_id);
private:
// mmap descriptor, used in chunk cache
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
// segment loading state
BitsetType field_data_ready_bitset_;
BitsetType index_ready_bitset_;

View File

@ -56,7 +56,8 @@ set(STORAGE_FILES
ThreadPools.cpp
ChunkCache.cpp
TencentCloudCredentialsProvider.cpp
TencentCloudSTSClient.cpp)
TencentCloudSTSClient.cpp
MmapChunkManager.cpp)
if(USE_OPENDAL)
list(APPEND STORAGE_FILES OpenDALChunkManager.cpp)

View File

@ -19,14 +19,12 @@
#include "mmap/Utils.h"
namespace milvus::storage {
std::shared_ptr<ColumnBase>
ChunkCache::Read(const std::string& filepath) {
auto path = CachePath(filepath);
ChunkCache::Read(const std::string& filepath,
const MmapChunkDescriptor& descriptor) {
{
std::shared_lock lck(mutex_);
auto it = columns_.find(path);
auto it = columns_.find(filepath);
if (it != columns_.end()) {
AssertInfo(it->second, "unexpected null column, file={}", filepath);
return it->second;
@ -36,29 +34,26 @@ ChunkCache::Read(const std::string& filepath) {
auto field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath);
std::unique_lock lck(mutex_);
auto it = columns_.find(path);
auto it = columns_.find(filepath);
if (it != columns_.end()) {
return it->second;
}
auto column = Mmap(path, field_data->GetFieldData());
auto column = Mmap(field_data->GetFieldData(), descriptor);
AssertInfo(column, "unexpected null column, file={}", filepath);
columns_.emplace(path, column);
columns_.emplace(filepath, column);
return column;
}
void
ChunkCache::Remove(const std::string& filepath) {
auto path = CachePath(filepath);
std::unique_lock lck(mutex_);
columns_.erase(path);
columns_.erase(filepath);
}
void
ChunkCache::Prefetch(const std::string& filepath) {
auto path = CachePath(filepath);
std::shared_lock lck(mutex_);
auto it = columns_.find(path);
auto it = columns_.find(filepath);
if (it == columns_.end()) {
return;
}
@ -68,35 +63,23 @@ ChunkCache::Prefetch(const std::string& filepath) {
reinterpret_cast<void*>(const_cast<char*>(column->MmappedData())),
column->ByteSize(),
read_ahead_policy_);
AssertInfo(ok == 0,
"failed to madvise to the data file {}, err: {}",
path,
if (ok != 0) {
LOG_WARN(
"failed to madvise to the data file {}, addr {}, size {}, err: {}",
filepath,
column->MmappedData(),
column->ByteSize(),
strerror(errno));
}
}
std::shared_ptr<ColumnBase>
ChunkCache::Mmap(const std::filesystem::path& path,
const FieldDataPtr& field_data) {
auto dir = path.parent_path();
std::filesystem::create_directories(dir);
ChunkCache::Mmap(const FieldDataPtr& field_data,
const MmapChunkDescriptor& descriptor) {
auto dim = field_data->get_dim();
auto data_type = field_data->get_data_type();
auto file = File::Open(path.string(), O_CREAT | O_TRUNC | O_RDWR);
// write the field data to disk
auto data_size = field_data->Size();
// unused
std::vector<std::vector<uint64_t>> element_indices{};
auto written = WriteFieldData(file, data_type, field_data, element_indices);
AssertInfo(written == data_size,
"failed to write data file {}, written "
"{} but total {}, err: {}",
path.c_str(),
written,
data_size,
strerror(errno));
std::shared_ptr<ColumnBase> column{};
@ -108,40 +91,17 @@ ChunkCache::Mmap(const std::filesystem::path& path,
offset += field_data->Size(i);
}
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, data_size, dim, data_type);
data_size, dim, data_type, mcm_, descriptor);
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);
} else if (IsVariableDataType(data_type)) {
AssertInfo(
false, "TODO: unimplemented for variable data type: {}", data_type);
} else {
column = std::make_shared<Column>(file, data_size, dim, data_type);
column = std::make_shared<Column>(
data_size, dim, data_type, mcm_, descriptor);
}
// unlink
auto ok = unlink(path.c_str());
AssertInfo(ok == 0,
"failed to unlink mmap data file {}, err: {}",
path.c_str(),
strerror(errno));
column->AppendBatch(field_data);
return column;
}
std::string
ChunkCache::CachePath(const std::string& filepath) {
auto path = std::filesystem::path(filepath);
auto prefix = std::filesystem::path(path_prefix_);
// Cache path shall not use absolute filepath direct, it shall always under path_prefix_
if (path.is_absolute()) {
return (prefix /
filepath.substr(path.root_directory().string().length(),
filepath.length()))
.string();
}
return (prefix / filepath).string();
}
} // namespace milvus::storage

View File

@ -15,7 +15,7 @@
// limitations under the License.
#pragma once
#include "storage/MmapChunkManager.h"
#include "mmap/Column.h"
namespace milvus::storage {
@ -24,10 +24,10 @@ extern std::map<std::string, int> ReadAheadPolicy_Map;
class ChunkCache {
public:
explicit ChunkCache(std::string path,
const std::string& read_ahead_policy,
ChunkManagerPtr cm)
: path_prefix_(std::move(path)), cm_(cm) {
explicit ChunkCache(const std::string& read_ahead_policy,
ChunkManagerPtr cm,
MmapChunkManagerPtr mcm)
: cm_(cm), mcm_(mcm) {
auto iter = ReadAheadPolicy_Map.find(read_ahead_policy);
AssertInfo(iter != ReadAheadPolicy_Map.end(),
"unrecognized read ahead policy: {}, "
@ -35,8 +35,7 @@ class ChunkCache {
"willneed, dontneed`",
read_ahead_policy);
read_ahead_policy_ = iter->second;
LOG_INFO("Init ChunkCache with prefix: {}, read_ahead_policy: {}",
path_prefix_,
LOG_INFO("Init ChunkCache with read_ahead_policy: {}",
read_ahead_policy);
}
@ -44,7 +43,7 @@ class ChunkCache {
public:
std::shared_ptr<ColumnBase>
Read(const std::string& filepath);
Read(const std::string& filepath, const MmapChunkDescriptor& descriptor);
void
Remove(const std::string& filepath);
@ -54,7 +53,7 @@ class ChunkCache {
private:
std::shared_ptr<ColumnBase>
Mmap(const std::filesystem::path& path, const FieldDataPtr& field_data);
Mmap(const FieldDataPtr& field_data, const MmapChunkDescriptor& descriptor);
std::string
CachePath(const std::string& filepath);
@ -66,8 +65,8 @@ class ChunkCache {
private:
mutable std::shared_mutex mutex_;
int read_ahead_policy_;
const std::string path_prefix_;
ChunkManagerPtr cm_;
MmapChunkManagerPtr mcm_;
ColumnTable columns_;
};

View File

@ -1,60 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <shared_mutex>
#include "ChunkCache.h"
#include "RemoteChunkManagerSingleton.h"
namespace milvus::storage {
class ChunkCacheSingleton {
private:
ChunkCacheSingleton() {
}
public:
ChunkCacheSingleton(const ChunkCacheSingleton&) = delete;
ChunkCacheSingleton&
operator=(const ChunkCacheSingleton&) = delete;
static ChunkCacheSingleton&
GetInstance() {
static ChunkCacheSingleton instance;
return instance;
}
void
Init(std::string root_path, std::string read_ahead_policy) {
if (cc_ == nullptr) {
auto rcm = RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
cc_ = std::make_shared<ChunkCache>(
std::move(root_path), std::move(read_ahead_policy), rcm);
}
}
ChunkCachePtr
GetChunkCache() {
return cc_;
}
private:
ChunkCachePtr cc_ = nullptr;
};
} // namespace milvus::storage

View File

@ -490,9 +490,8 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) {
dim = std::max(
dim,
(uint32_t)(
std::dynamic_pointer_cast<FieldData<SparseFloatVector>>(
field_data)
(uint32_t)(std::dynamic_pointer_cast<
FieldData<SparseFloatVector>>(field_data)
->Dim()));
auto sparse_rows =
static_cast<const knowhere::sparse::SparseRow<float>*>(

View File

@ -0,0 +1,299 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/MmapChunkManager.h"
#include "storage/LocalChunkManagerSingleton.h"
#include <fstream>
#include <sys/mman.h>
#include <unistd.h>
#include "stdio.h"
#include <fcntl.h>
#include "log/Log.h"
#include "storage/prometheus_client.h"
namespace milvus::storage {
namespace {
static constexpr int kMmapDefaultProt = PROT_WRITE | PROT_READ;
static constexpr int kMmapDefaultFlags = MAP_SHARED;
}; // namespace
// todo(cqy): After confirming the append parallelism of multiple fields, adjust the lock granularity.
MmapBlock::MmapBlock(const std::string& file_name,
const uint64_t file_size,
BlockType type)
: file_name_(file_name),
file_size_(file_size),
block_type_(type),
is_valid_(false) {
}
void
MmapBlock::Init() {
std::lock_guard<std::mutex> lock(file_mutex_);
if (is_valid_ == true) {
LOG_WARN("This mmap block has been init.");
return;
}
// create tmp file
int fd = open(file_name_.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (fd == -1) {
PanicInfo(ErrorCode::FileCreateFailed, "Failed to open mmap tmp file");
}
// append file size to 'file_size'
if (lseek(fd, file_size_ - 1, SEEK_SET) == -1) {
PanicInfo(ErrorCode::FileReadFailed, "Failed to seek mmap tmp file");
}
if (write(fd, "", 1) == -1) {
PanicInfo(ErrorCode::FileWriteFailed, "Failed to write mmap tmp file");
}
// memory mmaping
addr_ = static_cast<char*>(
mmap(nullptr, file_size_, kMmapDefaultProt, kMmapDefaultFlags, fd, 0));
if (addr_ == MAP_FAILED) {
PanicInfo(ErrorCode::MmapError, "Failed to mmap in mmap_block");
}
offset_.store(0);
close(fd);
milvus::storage::internal_mmap_allocated_space_bytes_file.Observe(
file_size_);
milvus::storage::internal_mmap_in_used_space_bytes_file.Increment(
file_size_);
is_valid_ = true;
allocated_size_.fetch_add(file_size_);
}
void
MmapBlock::Close() {
std::lock_guard<std::mutex> lock(file_mutex_);
if (is_valid_ == false) {
LOG_WARN("This mmap block has been closed.");
return;
}
if (addr_ != nullptr) {
if (munmap(addr_, file_size_) != 0) {
PanicInfo(ErrorCode::MemAllocateSizeNotMatch,
"Failed to munmap in mmap_block");
}
}
if (access(file_name_.c_str(), F_OK) == 0) {
if (remove(file_name_.c_str()) != 0) {
PanicInfo(ErrorCode::MmapError, "Failed to munmap in mmap_block");
}
}
allocated_size_.fetch_sub(file_size_);
milvus::storage::internal_mmap_in_used_space_bytes_file.Decrement(
file_size_);
is_valid_ = false;
}
MmapBlock::~MmapBlock() {
if (is_valid_ == true) {
try {
Close();
} catch (const std::exception& e) {
LOG_ERROR(e.what());
}
}
}
void*
MmapBlock::Get(const uint64_t size) {
AssertInfo(is_valid_, "Fail to get memory from invalid MmapBlock.");
if (file_size_ - offset_.load() < size) {
return nullptr;
} else {
return (void*)(addr_ + offset_.fetch_add(size));
}
}
MmapBlockPtr
MmapBlocksHandler::AllocateFixSizeBlock() {
if (fix_size_blocks_cache_.size() != 0) {
// return a mmap_block in fix_size_blocks_cache_
auto block = std::move(fix_size_blocks_cache_.front());
fix_size_blocks_cache_.pop();
return std::move(block);
} else {
// if space not enough for create a new block, clear cache and check again
if (GetFixFileSize() + Size() > max_disk_limit_) {
PanicInfo(
ErrorCode::MemAllocateSizeNotMatch,
"Failed to create a new mmap_block, not enough disk for "
"create a new mmap block. Allocated size: {}, Max size: {}",
Size(),
max_disk_limit_);
}
auto new_block = std::make_unique<MmapBlock>(
GetMmapFilePath(), GetFixFileSize(), MmapBlock::BlockType::Fixed);
new_block->Init();
return std::move(new_block);
}
}
MmapBlockPtr
MmapBlocksHandler::AllocateLargeBlock(const uint64_t size) {
if (size + Capacity() > max_disk_limit_) {
ClearCache();
}
if (size + Size() > max_disk_limit_) {
PanicInfo(ErrorCode::MemAllocateSizeNotMatch,
"Failed to create a new mmap_block, not enough disk for "
"create a new mmap block. Allocated size: {}, Max size: {}",
Size(),
max_disk_limit_);
}
auto new_block = std::make_unique<MmapBlock>(
GetMmapFilePath(), size, MmapBlock::BlockType::Variable);
new_block->Init();
return std::move(new_block);
}
void
MmapBlocksHandler::Deallocate(MmapBlockPtr&& block) {
if (block->GetType() == MmapBlock::BlockType::Fixed) {
// store the mmap block in cache
block->Reset();
fix_size_blocks_cache_.push(std::move(block));
uint64_t max_cache_size =
uint64_t(cache_threshold * (float)max_disk_limit_);
if (fix_size_blocks_cache_.size() * fix_mmap_file_size_ >
max_cache_size) {
FitCache(max_cache_size);
}
} else {
// release the mmap block
block->Close();
block = nullptr;
}
}
void
MmapBlocksHandler::ClearCache() {
while (!fix_size_blocks_cache_.empty()) {
auto block = std::move(fix_size_blocks_cache_.front());
block->Close();
fix_size_blocks_cache_.pop();
}
}
void
MmapBlocksHandler::FitCache(const uint64_t size) {
while (fix_size_blocks_cache_.size() * fix_mmap_file_size_ > size) {
auto block = std::move(fix_size_blocks_cache_.front());
block->Close();
fix_size_blocks_cache_.pop();
}
}
MmapChunkManager::~MmapChunkManager() {
// munmap all mmap_blocks before remove dir
for (auto it = blocks_table_.begin(); it != blocks_table_.end();) {
it = blocks_table_.erase(it);
}
if (blocks_handler_ != nullptr) {
blocks_handler_ = nullptr;
}
// clean the mmap dir
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
if (cm->Exist(mmap_file_prefix_)) {
cm->RemoveDir(mmap_file_prefix_);
}
}
void
MmapChunkManager::Register(const MmapChunkDescriptor key) {
if (HasKey(key)) {
LOG_WARN("key has exist in growing mmap manager");
return;
}
std::unique_lock<std::shared_mutex> lck(mtx_);
blocks_table_.emplace(key, std::vector<MmapBlockPtr>());
return;
}
void
MmapChunkManager::UnRegister(const MmapChunkDescriptor key) {
std::unique_lock<std::shared_mutex> lck(mtx_);
if (blocks_table_.find(key) != blocks_table_.end()) {
auto& blocks = blocks_table_[key];
for (auto i = 0; i < blocks.size(); i++) {
blocks_handler_->Deallocate(std::move(blocks[i]));
}
blocks_table_.erase(key);
}
}
bool
MmapChunkManager::HasKey(const MmapChunkDescriptor key) {
std::shared_lock<std::shared_mutex> lck(mtx_);
return (blocks_table_.find(key) != blocks_table_.end());
}
void*
MmapChunkManager::Allocate(const MmapChunkDescriptor key, const uint64_t size) {
AssertInfo(HasKey(key), "key {} has not been register.", key->segment_id);
std::unique_lock<std::shared_mutex> lck(mtx_);
if (size < blocks_handler_->GetFixFileSize()) {
// find a place to fit in
for (auto block_id = 0; block_id < blocks_table_[key].size();
block_id++) {
auto addr = blocks_table_[key][block_id]->Get(size);
if (addr != nullptr) {
return addr;
}
}
// create a new block
auto new_block = blocks_handler_->AllocateFixSizeBlock();
AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr");
auto addr = new_block->Get(size);
AssertInfo(addr != nullptr, "fail to allocate from mmap block.");
blocks_table_[key].emplace_back(std::move(new_block));
return addr;
} else {
auto new_block = blocks_handler_->AllocateLargeBlock(size);
AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr");
auto addr = new_block->Get(size);
AssertInfo(addr != nullptr, "fail to allocate from mmap block.");
blocks_table_[key].emplace_back(std::move(new_block));
return addr;
}
}
MmapChunkManager::MmapChunkManager(std::string root_path,
const uint64_t disk_limit,
const uint64_t file_size) {
blocks_handler_ =
std::make_unique<MmapBlocksHandler>(disk_limit, file_size, root_path);
mmap_file_prefix_ = root_path;
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
AssertInfo(cm != nullptr,
"Fail to get LocalChunkManager, LocalChunkManagerPtr is null");
if (cm->Exist(root_path)) {
cm->RemoveDir(root_path);
}
cm->CreateDir(root_path);
LOG_INFO(
"Init MappChunkManager with: Path {}, MaxDiskSize {} MB, "
"FixedFileSize {} MB.",
root_path,
disk_limit / (1024 * 1024),
file_size / (1024 * 1024));
}
} // namespace milvus::storage

View File

@ -0,0 +1,214 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <cstdint>
#include <cstring>
#include <vector>
#include <queue>
#include <atomic>
#include <unordered_map>
#include <shared_mutex>
#include <memory>
#include <shared_mutex>
#include "common/EasyAssert.h"
#include "log/Log.h"
#include <optional>
#include "common/type_c.h"
#include "storage/LocalChunkManagerSingleton.h"
namespace milvus::storage {
// use segment id and segment type to descripe a segment in mmap chunk manager, segment only in two type (growing or sealed) in mmap chunk manager
struct MmapChunkDescriptorValue {
int64_t segment_id;
SegmentType segment_type;
};
using MmapChunkDescriptor = std::shared_ptr<MmapChunkDescriptorValue>;
struct DescriptorHash {
size_t
operator()(const MmapChunkDescriptor& x) const {
return x->segment_id * 10 + (size_t)x->segment_type;
}
};
/**
* @brief MmapBlock is a basic unit of MmapChunkManager. It handle all memory mmaping in one tmp file.
* static function(TotalBlocksSize) is used to get total files size of chunk mmap.
*/
struct MmapBlock {
public:
enum class BlockType {
Fixed = 0,
Variable = 1,
};
MmapBlock(const std::string& file_name,
const uint64_t file_size,
BlockType type = BlockType::Fixed);
~MmapBlock();
void
Init();
void
Close();
void*
Get(const uint64_t size);
void
Reset() {
offset_.store(0);
}
BlockType
GetType() {
return block_type_;
}
uint64_t
GetCapacity() {
return file_size_;
}
static void
ClearAllocSize() {
allocated_size_.store(0);
}
static uint64_t
TotalBlocksSize() {
return allocated_size_.load();
}
private:
const std::string file_name_;
const uint64_t file_size_;
char* addr_ = nullptr;
std::atomic<uint64_t> offset_ = 0;
const BlockType block_type_;
std::atomic<bool> is_valid_ = false;
static inline std::atomic<uint64_t> allocated_size_ =
0; //keeping the total size used in
mutable std::mutex file_mutex_;
};
using MmapBlockPtr = std::unique_ptr<MmapBlock>;
/**
* @brief MmapBlocksHandler is used to handle the creation and destruction of mmap blocks
* MmapBlocksHandler is not thread safe,
*/
class MmapBlocksHandler {
public:
MmapBlocksHandler(const uint64_t disk_limit,
const uint64_t fix_file_size,
const std::string file_prefix)
: max_disk_limit_(disk_limit),
mmap_file_prefix_(file_prefix),
fix_mmap_file_size_(fix_file_size) {
mmmap_file_counter_.store(0);
MmapBlock::ClearAllocSize();
}
~MmapBlocksHandler() {
ClearCache();
}
uint64_t
GetDiskLimit() {
return max_disk_limit_;
}
uint64_t
GetFixFileSize() {
return fix_mmap_file_size_;
}
uint64_t
Capacity() {
return MmapBlock::TotalBlocksSize();
}
uint64_t
Size() {
return Capacity() - fix_size_blocks_cache_.size() * fix_mmap_file_size_;
}
MmapBlockPtr
AllocateFixSizeBlock();
MmapBlockPtr
AllocateLargeBlock(const uint64_t size);
void
Deallocate(MmapBlockPtr&& block);
private:
std::string
GetFilePrefix() {
return mmap_file_prefix_;
}
std::string
GetMmapFilePath() {
auto file_id = mmmap_file_counter_.fetch_add(1);
return mmap_file_prefix_ + "/" + std::to_string(file_id);
}
void
ClearCache();
void
FitCache(const uint64_t size);
private:
uint64_t max_disk_limit_;
std::string mmap_file_prefix_;
std::atomic<uint64_t> mmmap_file_counter_;
uint64_t fix_mmap_file_size_;
std::queue<MmapBlockPtr> fix_size_blocks_cache_;
const float cache_threshold = 0.25;
};
/**
* @brief MmapChunkManager
* MmapChunkManager manages the memory-mapping space in mmap manager;
* MmapChunkManager uses blocks_table_ to record the relationship of segments and the mapp space it uses.
* The basic space unit of MmapChunkManager is MmapBlock, and is managed by MmapBlocksHandler.
* todo(cqy): blocks_handler_ and blocks_table_ is not thread safe, we need use fine-grained locks for better performance.
*/
class MmapChunkManager {
public:
explicit MmapChunkManager(std::string root_path,
const uint64_t disk_limit,
const uint64_t file_size);
~MmapChunkManager();
void
Register(const MmapChunkDescriptor key);
void
UnRegister(const MmapChunkDescriptor key);
bool
HasKey(const MmapChunkDescriptor key);
void*
Allocate(const MmapChunkDescriptor key, const uint64_t size);
uint64_t
GetDiskAllocSize() {
std::shared_lock<std::shared_mutex> lck(mtx_);
if (blocks_handler_ == nullptr) {
return 0;
} else {
return blocks_handler_->Capacity();
}
}
uint64_t
GetDiskUsage() {
std::shared_lock<std::shared_mutex> lck(mtx_);
if (blocks_handler_ == nullptr) {
return 0;
} else {
return blocks_handler_->Size();
}
}
private:
mutable std::shared_mutex mtx_;
std::unordered_map<MmapChunkDescriptor,
std::vector<MmapBlockPtr>,
DescriptorHash>
blocks_table_;
std::unique_ptr<MmapBlocksHandler> blocks_handler_ = nullptr;
std::string mmap_file_prefix_;
};
using MmapChunkManagerPtr = std::shared_ptr<MmapChunkManager>;
} // namespace milvus::storage

View File

@ -0,0 +1,123 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <shared_mutex>
#include "ChunkCache.h"
#include "RemoteChunkManagerSingleton.h"
namespace milvus::storage {
/**
* @brief MmapManager(singleton)
* MmapManager holds all mmap components;
* all mmap components use mmapchunkmanager to allocate mmap space;
* no thread safe, only one thread init in segcore.
*/
class MmapManager {
private:
MmapManager() = default;
public:
MmapManager(const MmapManager&) = delete;
MmapManager&
operator=(const MmapManager&) = delete;
static MmapManager&
GetInstance() {
static MmapManager instance;
return instance;
}
~MmapManager() {
if (cc_ != nullptr) {
cc_ = nullptr;
}
// delete mmap chunk manager at last
if (mcm_ != nullptr) {
mcm_ = nullptr;
}
}
void
Init(const MmapConfig& config) {
if (init_flag_ == false) {
std::lock_guard<std::mutex> lock(
init_mutex_); // in case many threads call init
mmap_config_ = config;
if (mcm_ == nullptr) {
mcm_ = std::make_shared<MmapChunkManager>(
mmap_config_.mmap_path,
mmap_config_.disk_limit,
mmap_config_.fix_file_size);
}
if (cc_ == nullptr) {
auto rcm = RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
cc_ = std::make_shared<ChunkCache>(
std::move(mmap_config_.cache_read_ahead_policy), rcm, mcm_);
}
LOG_INFO("Init MmapConfig with MmapConfig: {}",
mmap_config_.ToString());
init_flag_ = true;
} else {
LOG_WARN("mmap manager has been inited.");
}
}
ChunkCachePtr
GetChunkCache() {
AssertInfo(init_flag_ == true, "Mmap manager has not been init.");
return cc_;
}
MmapChunkManagerPtr
GetMmapChunkManager() {
AssertInfo(init_flag_ == true, "Mmap manager has not been init.");
return mcm_;
}
MmapConfig&
GetMmapConfig() {
AssertInfo(init_flag_ == true, "Mmap manager has not been init.");
return mmap_config_;
}
size_t
GetAllocSize() {
if (mcm_ != nullptr) {
return mcm_->GetDiskAllocSize();
} else {
return 0;
}
}
size_t
GetDiskUsage() {
if (mcm_ != nullptr) {
return mcm_->GetDiskUsage();
} else {
return 0;
}
}
private:
mutable std::mutex init_mutex_;
MmapConfig mmap_config_;
MmapChunkManagerPtr mcm_ = nullptr;
ChunkCachePtr cc_ = nullptr;
std::atomic<bool> init_flag_ = false;
};
} // namespace milvus::storage

View File

@ -119,6 +119,33 @@ struct StorageConfig {
}
};
struct MmapConfig {
std::string cache_read_ahead_policy;
std::string mmap_path;
uint64_t disk_limit;
uint64_t fix_file_size;
bool growing_enable_mmap;
bool
GetEnableGrowingMmap() const {
return growing_enable_mmap;
}
void
SetEnableGrowingMmap(bool flag) {
this->growing_enable_mmap = flag;
}
std::string
ToString() const {
std::stringstream ss;
ss << "[cache_read_ahead_policy=" << cache_read_ahead_policy
<< ", mmap_path=" << mmap_path
<< ", disk_limit=" << disk_limit / (1024 * 1024) << "MB"
<< ", fix_file_size=" << fix_file_size / (1024 * 1024) << "MB"
<< ", growing_enable_mmap=" << std::boolalpha << growing_enable_mmap
<< "]";
return ss.str();
}
};
} // namespace milvus::storage
template <>

View File

@ -18,7 +18,7 @@
#include "storage/prometheus_client.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/ChunkCacheSingleton.h"
#include "storage/MmapManager.h"
CStatus
GetLocalUsedSize(const char* c_dir, int64_t* size) {
@ -86,10 +86,16 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) {
}
CStatus
InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy) {
InitMmapManager(CMmapConfig c_mmap_config) {
try {
milvus::storage::ChunkCacheSingleton::GetInstance().Init(
c_dir_path, read_ahead_policy);
milvus::storage::MmapConfig mmap_config;
mmap_config.cache_read_ahead_policy =
std::string(c_mmap_config.cache_read_ahead_policy);
mmap_config.mmap_path = std::string(c_mmap_config.mmap_path);
mmap_config.disk_limit = c_mmap_config.disk_limit;
mmap_config.fix_file_size = c_mmap_config.fix_file_size;
mmap_config.growing_enable_mmap = c_mmap_config.growing_enable_mmap;
milvus::storage::MmapManager::GetInstance().Init(mmap_config);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);

View File

@ -31,7 +31,7 @@ CStatus
InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config);
CStatus
InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy);
InitMmapManager(CMmapConfig c_mmap_config);
void
CleanRemoteChunkManagerSingleton();

View File

@ -69,6 +69,7 @@ set(MILVUS_TEST_FILES
test_regex_query.cpp
test_futures.cpp
test_array_inverted_index.cpp
test_chunk_vector.cpp
)
if ( INDEX_ENGINE STREQUAL "cardinal" )

View File

@ -23,6 +23,7 @@ main(int argc, char** argv) {
TestLocalPath);
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(
get_default_local_storage_config());
milvus::storage::MmapManager::GetInstance().Init(get_default_mmap_config());
return RUN_ALL_TESTS();
}

View File

@ -970,7 +970,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
auto collection = NewCollection(get_default_schema_config());
CSegmentInterface segment;
auto status = NewSegment(collection, Growing, -1, &segment);
auto status = NewSegment(collection, Growing, 111, &segment);
ASSERT_EQ(status.error_code, Success);
auto col = (milvus::segcore::Collection*)collection;

View File

@ -27,16 +27,32 @@
#include "storage/LocalChunkManagerSingleton.h"
#define DEFAULT_READ_AHEAD_POLICY "willneed"
class ChunkCacheTest : public testing::Test {
public:
void
SetUp() override {
mcm = milvus::storage::MmapManager::GetInstance().GetMmapChunkManager();
mcm->Register(descriptor);
}
void
TearDown() override {
mcm->UnRegister(descriptor);
}
const char* local_storage_path = "/tmp/test_chunk_cache/local";
const char* file_name = "chunk_cache_test/insert_log/2/101/1000000";
milvus::storage::MmapChunkManagerPtr mcm;
milvus::segcore::SegcoreConfig config;
milvus::storage::MmapChunkDescriptor descriptor =
std::shared_ptr<milvus::storage::MmapChunkDescriptorValue>(
new milvus::storage::MmapChunkDescriptorValue(
{111, SegmentType::Sealed}));
};
TEST(ChunkCacheTest, Read) {
TEST_F(ChunkCacheTest, Read) {
auto N = 10000;
auto dim = 128;
auto metric_type = knowhere::metric::L2;
auto mmap_dir = "/tmp/test_chunk_cache/mmap";
auto local_storage_path = "/tmp/test_chunk_cache/local";
auto file_name = std::string("chunk_cache_test/insert_log/1/101/1000000");
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
local_storage_path);
@ -69,9 +85,10 @@ TEST(ChunkCacheTest, Read) {
field_data_meta,
field_meta);
auto cc = std::make_shared<milvus::storage::ChunkCache>(
mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm);
const auto& column = cc->Read(file_name);
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
const auto& column = cc->Read(file_name, descriptor);
std::cout << "column->ByteSize() :" << column->ByteSize() << " "
<< dim * N * 4 << std::endl;
Assert(column->ByteSize() == dim * N * 4);
auto actual = (float*)column->Data();
@ -82,23 +99,13 @@ TEST(ChunkCacheTest, Read) {
cc->Remove(file_name);
lcm->Remove(file_name);
std::filesystem::remove_all(mmap_dir);
auto exist = lcm->Exist(file_name);
Assert(!exist);
exist = std::filesystem::exists(mmap_dir);
Assert(!exist);
}
TEST(ChunkCacheTest, TestMultithreads) {
TEST_F(ChunkCacheTest, TestMultithreads) {
auto N = 1000;
auto dim = 128;
auto metric_type = knowhere::metric::L2;
auto mmap_dir = "/tmp/test_chunk_cache/mmap";
auto local_storage_path = "/tmp/test_chunk_cache/local";
auto file_name = std::string("chunk_cache_test/insert_log/2/101/1000000");
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
local_storage_path);
@ -131,13 +138,13 @@ TEST(ChunkCacheTest, TestMultithreads) {
field_data_meta,
field_meta);
auto cc = std::make_shared<milvus::storage::ChunkCache>(
mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm);
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
constexpr int threads = 16;
std::vector<int64_t> total_counts(threads);
auto executor = [&](int thread_id) {
const auto& column = cc->Read(file_name);
std::cout << "thread id" << thread_id << " read data" << std::endl;
const auto& column = cc->Read(file_name, descriptor);
Assert(column->ByteSize() == dim * N * 4);
auto actual = (float*)column->Data();
@ -156,10 +163,4 @@ TEST(ChunkCacheTest, TestMultithreads) {
cc->Remove(file_name);
lcm->Remove(file_name);
std::filesystem::remove_all(mmap_dir);
auto exist = lcm->Exist(file_name);
Assert(!exist);
exist = std::filesystem::exists(mmap_dir);
Assert(!exist);
}

View File

@ -0,0 +1,438 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "common/Types.h"
#include "knowhere/comp/index_param.h"
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentGrowingImpl.h"
#include "pb/schema.pb.h"
#include "test_utils/DataGen.h"
#include "query/Plan.h"
#include "query/generated/ExecExprVisitor.h"
using namespace milvus::segcore;
using namespace milvus;
namespace pb = milvus::proto;
class ChunkVectorTest : public testing::Test {
public:
void
SetUp() override {
auto& mmap_config =
milvus::storage::MmapManager::GetInstance().GetMmapConfig();
mmap_config.SetEnableGrowingMmap(true);
}
void
TearDown() override {
auto& mmap_config =
milvus::storage::MmapManager::GetInstance().GetMmapConfig();
mmap_config.SetEnableGrowingMmap(false);
}
knowhere::MetricType metric_type = "IP";
milvus::segcore::SegcoreConfig config;
};
TEST_F(ChunkVectorTest, FillDataWithMmap) {
auto schema = std::make_shared<Schema>();
auto bool_field = schema->AddDebugField("bool", DataType::BOOL);
auto int8_field = schema->AddDebugField("int8", DataType::INT8);
auto int16_field = schema->AddDebugField("int16", DataType::INT16);
auto int32_field = schema->AddDebugField("int32", DataType::INT32);
auto int64_field = schema->AddDebugField("int64", DataType::INT64);
auto float_field = schema->AddDebugField("float", DataType::FLOAT);
auto double_field = schema->AddDebugField("double", DataType::DOUBLE);
auto varchar_field = schema->AddDebugField("varchar", DataType::VARCHAR);
auto json_field = schema->AddDebugField("json", DataType::JSON);
auto int_array_field =
schema->AddDebugField("int_array", DataType::ARRAY, DataType::INT8);
auto long_array_field =
schema->AddDebugField("long_array", DataType::ARRAY, DataType::INT64);
auto bool_array_field =
schema->AddDebugField("bool_array", DataType::ARRAY, DataType::BOOL);
auto string_array_field = schema->AddDebugField(
"string_array", DataType::ARRAY, DataType::VARCHAR);
auto double_array_field = schema->AddDebugField(
"double_array", DataType::ARRAY, DataType::DOUBLE);
auto float_array_field =
schema->AddDebugField("float_array", DataType::ARRAY, DataType::FLOAT);
auto fp32_vec = schema->AddDebugField(
"fp32_vec", DataType::VECTOR_FLOAT, 128, metric_type);
auto fp16_vec = schema->AddDebugField(
"fp16_vec", DataType::VECTOR_FLOAT16, 128, metric_type);
auto bf16_vec = schema->AddDebugField(
"bf16_vec", DataType::VECTOR_BFLOAT16, 128, metric_type);
auto sparse_vec = schema->AddDebugField(
"sparse_vec", DataType::VECTOR_SPARSE_FLOAT, 128, metric_type);
schema->set_primary_field_id(int64_field);
std::map<std::string, std::string> index_params = {
{"index_type", "HNSW"}, {"metric_type", metric_type}, {"nlist", "128"}};
std::map<std::string, std::string> type_params = {{"dim", "128"}};
FieldIndexMeta fieldIndexMeta(
fp32_vec, std::move(index_params), std::move(type_params));
std::map<FieldId, FieldIndexMeta> filedMap = {{fp32_vec, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(100000, std::move(filedMap));
auto segment_growing = CreateGrowingSegment(schema, metaPtr, 1, config);
auto segment = dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
int64_t per_batch = 1000;
int64_t n_batch = 3;
int64_t dim = 128;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
auto offset = segment->PreInsert(per_batch);
segment->Insert(offset,
per_batch,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
auto num_inserted = (i + 1) * per_batch;
auto ids_ds = GenRandomIds(num_inserted);
auto bool_result =
segment->bulk_subscript(bool_field, ids_ds->GetIds(), num_inserted);
auto int8_result =
segment->bulk_subscript(int8_field, ids_ds->GetIds(), num_inserted);
auto int16_result = segment->bulk_subscript(
int16_field, ids_ds->GetIds(), num_inserted);
auto int32_result = segment->bulk_subscript(
int32_field, ids_ds->GetIds(), num_inserted);
auto int64_result = segment->bulk_subscript(
int64_field, ids_ds->GetIds(), num_inserted);
auto float_result = segment->bulk_subscript(
float_field, ids_ds->GetIds(), num_inserted);
auto double_result = segment->bulk_subscript(
double_field, ids_ds->GetIds(), num_inserted);
auto varchar_result = segment->bulk_subscript(
varchar_field, ids_ds->GetIds(), num_inserted);
auto json_result =
segment->bulk_subscript(json_field, ids_ds->GetIds(), num_inserted);
auto int_array_result = segment->bulk_subscript(
int_array_field, ids_ds->GetIds(), num_inserted);
auto long_array_result = segment->bulk_subscript(
long_array_field, ids_ds->GetIds(), num_inserted);
auto bool_array_result = segment->bulk_subscript(
bool_array_field, ids_ds->GetIds(), num_inserted);
auto string_array_result = segment->bulk_subscript(
string_array_field, ids_ds->GetIds(), num_inserted);
auto double_array_result = segment->bulk_subscript(
double_array_field, ids_ds->GetIds(), num_inserted);
auto float_array_result = segment->bulk_subscript(
float_array_field, ids_ds->GetIds(), num_inserted);
auto fp32_vec_result =
segment->bulk_subscript(fp32_vec, ids_ds->GetIds(), num_inserted);
auto fp16_vec_result =
segment->bulk_subscript(fp16_vec, ids_ds->GetIds(), num_inserted);
auto bf16_vec_result =
segment->bulk_subscript(bf16_vec, ids_ds->GetIds(), num_inserted);
auto sparse_vec_result =
segment->bulk_subscript(sparse_vec, ids_ds->GetIds(), num_inserted);
EXPECT_EQ(bool_result->scalars().bool_data().data_size(), num_inserted);
EXPECT_EQ(int8_result->scalars().int_data().data_size(), num_inserted);
EXPECT_EQ(int16_result->scalars().int_data().data_size(), num_inserted);
EXPECT_EQ(int32_result->scalars().int_data().data_size(), num_inserted);
EXPECT_EQ(int64_result->scalars().long_data().data_size(),
num_inserted);
EXPECT_EQ(float_result->scalars().float_data().data_size(),
num_inserted);
EXPECT_EQ(double_result->scalars().double_data().data_size(),
num_inserted);
EXPECT_EQ(varchar_result->scalars().string_data().data_size(),
num_inserted);
EXPECT_EQ(json_result->scalars().json_data().data_size(), num_inserted);
EXPECT_EQ(fp32_vec_result->vectors().float_vector().data_size(),
num_inserted * dim);
EXPECT_EQ(fp16_vec_result->vectors().float16_vector().size(),
num_inserted * dim * 2);
EXPECT_EQ(bf16_vec_result->vectors().bfloat16_vector().size(),
num_inserted * dim * 2);
EXPECT_EQ(
sparse_vec_result->vectors().sparse_float_vector().contents_size(),
num_inserted);
EXPECT_EQ(int_array_result->scalars().array_data().data_size(),
num_inserted);
EXPECT_EQ(long_array_result->scalars().array_data().data_size(),
num_inserted);
EXPECT_EQ(bool_array_result->scalars().array_data().data_size(),
num_inserted);
EXPECT_EQ(string_array_result->scalars().array_data().data_size(),
num_inserted);
EXPECT_EQ(double_array_result->scalars().array_data().data_size(),
num_inserted);
EXPECT_EQ(float_array_result->scalars().array_data().data_size(),
num_inserted);
}
}
TEST_F(ChunkVectorTest, QueryWithMmap) {
auto schema = std::make_shared<Schema>();
schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
schema->AddDebugField("age", DataType::FLOAT);
auto i64_fid = schema->AddDebugField("counter", DataType::INT64);
schema->set_primary_field_id(i64_fid);
const char* raw_plan = R"(vector_anns: <
field_id: 100
predicates: <
term_expr: <
column_info: <
field_id: 102
data_type: Int64
>
values: <
int64_val: 1
>
values: <
int64_val: 2
>
>
>
query_info: <
topk: 5
round_decimal: 3
metric_type: "L2"
search_params: "{\"nprobe\": 10}"
>
placeholder_tag: "$0"
>)";
int64_t N = 4000;
auto dataset = DataGen(schema, N);
auto segment = CreateGrowingSegment(schema, empty_index_meta, 11, config);
segment->PreInsert(N);
segment->Insert(0,
N,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
auto plan_str = translate_text_plan_to_binary_plan(raw_plan);
auto plan = milvus::query::CreateSearchPlanByExpr(
*schema, plan_str.data(), plan_str.size());
auto num_queries = 3;
auto ph_group_raw =
milvus::segcore::CreatePlaceholderGroup(num_queries, 16, 1024);
auto ph_group = milvus::query::ParsePlaceholderGroup(
plan.get(), ph_group_raw.SerializeAsString());
Timestamp timestamp = 1000000;
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
int topk = 5;
auto json = SearchResultToJson(*sr);
ASSERT_EQ(sr->total_nq_, num_queries);
ASSERT_EQ(sr->unity_topK_, topk);
}
// TEST_F(ChunkVectorTest, ArrayExprWithMmap) {
// auto schema = std::make_shared<Schema>();
// auto i64_fid = schema->AddDebugField("id", DataType::INT64);
// auto long_array_fid =
// schema->AddDebugField("long_array", DataType::ARRAY, DataType::INT64);
// auto bool_array_fid =
// schema->AddDebugField("bool_array", DataType::ARRAY, DataType::BOOL);
// auto float_array_fid =
// schema->AddDebugField("float_array", DataType::ARRAY, DataType::FLOAT);
// auto string_array_fid = schema->AddDebugField(
// "string_array", DataType::ARRAY, DataType::VARCHAR);
// schema->set_primary_field_id(i64_fid);
// auto seg = CreateGrowingSegment(schema, empty_index_meta, 22, config);
// int N = 1000;
// std::map<std::string, std::vector<ScalarArray>> array_cols;
// int num_iters = 1;
// for (int iter = 0; iter < num_iters; ++iter) {
// auto raw_data = DataGen(schema, N, iter);
// auto new_long_array_col = raw_data.get_col<ScalarArray>(long_array_fid);
// auto new_bool_array_col = raw_data.get_col<ScalarArray>(bool_array_fid);
// auto new_float_array_col =
// raw_data.get_col<ScalarArray>(float_array_fid);
// auto new_string_array_col =
// raw_data.get_col<ScalarArray>(string_array_fid);
// array_cols["long"].insert(array_cols["long"].end(),
// new_long_array_col.begin(),
// new_long_array_col.end());
// array_cols["bool"].insert(array_cols["bool"].end(),
// new_bool_array_col.begin(),
// new_bool_array_col.end());
// array_cols["float"].insert(array_cols["float"].end(),
// new_float_array_col.begin(),
// new_float_array_col.end());
// array_cols["string"].insert(array_cols["string"].end(),
// new_string_array_col.begin(),
// new_string_array_col.end());
// seg->PreInsert(N);
// seg->Insert(iter * N,
// N,
// raw_data.row_ids_.data(),
// raw_data.timestamps_.data(),
// raw_data.raw_);
// }
// auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
// query::ExecPlanNodeVisitor visitor(*seg_promote, MAX_TIMESTAMP);
// std::vector<std::tuple<std::string,
// std::string,
// std::function<bool(milvus::Array & array)>>>
// testcases = {
// {R"(term_expr: <
// column_info: <
// field_id: 101
// data_type: Array
// nested_path:"0"
// element_type:Int64
// >
// values:<int64_val:1 > values:<int64_val:2 > values:<int64_val:3 >
// >)",
// "long",
// [](milvus::Array& array) {
// auto val = array.get_data<int64_t>(0);
// return val == 1 || val == 2 || val == 3;
// }},
// {R"(term_expr: <
// column_info: <
// field_id: 101
// data_type: Array
// nested_path:"0"
// element_type:Int64
// >
// >)",
// "long",
// [](milvus::Array& array) { return false; }},
// {R"(term_expr: <
// column_info: <
// field_id: 102
// data_type: Array
// nested_path:"0"
// element_type:Bool
// >
// values:<bool_val:false > values:<bool_val:false >
// >)",
// "bool",
// [](milvus::Array& array) {
// auto val = array.get_data<bool>(0);
// return !val;
// }},
// {R"(term_expr: <
// column_info: <
// field_id: 102
// data_type: Array
// nested_path:"0"
// element_type:Bool
// >
// >)",
// "bool",
// [](milvus::Array& array) { return false; }},
// {R"(term_expr: <
// column_info: <
// field_id: 103
// data_type: Array
// nested_path:"0"
// element_type:Float
// >
// values:<float_val:1.23 > values:<float_val:124.31 >
// >)",
// "float",
// [](milvus::Array& array) {
// auto val = array.get_data<double>(0);
// return val == 1.23 || val == 124.31;
// }},
// {R"(term_expr: <
// column_info: <
// field_id: 103
// data_type: Array
// nested_path:"0"
// element_type:Float
// >
// >)",
// "float",
// [](milvus::Array& array) { return false; }},
// {R"(term_expr: <
// column_info: <
// field_id: 104
// data_type: Array
// nested_path:"0"
// element_type:VarChar
// >
// values:<string_val:"abc" > values:<string_val:"idhgf1s" >
// >)",
// "string",
// [](milvus::Array& array) {
// auto val = array.get_data<std::string_view>(0);
// return val == "abc" || val == "idhgf1s";
// }},
// {R"(term_expr: <
// column_info: <
// field_id: 104
// data_type: Array
// nested_path:"0"
// element_type:VarChar
// >
// >)",
// "string",
// [](milvus::Array& array) { return false; }},
// {R"(term_expr: <
// column_info: <
// field_id: 104
// data_type: Array
// nested_path:"1024"
// element_type:VarChar
// >
// values:<string_val:"abc" > values:<string_val:"idhgf1s" >
// >)",
// "string",
// [](milvus::Array& array) {
// if (array.length() <= 1024) {
// return false;
// }
// auto val = array.get_data<std::string_view>(1024);
// return val == "abc" || val == "idhgf1s";
// }},
// };
// std::string raw_plan_tmp = R"(vector_anns: <
// field_id: 100
// predicates: <
// @@@@
// >
// query_info: <
// topk: 10
// round_decimal: 3
// metric_type: "L2"
// search_params: "{\"nprobe\": 10}"
// >
// placeholder_tag: "$0"
// >)";
// for (auto [clause, array_type, ref_func] : testcases) {
// auto loc = raw_plan_tmp.find("@@@@");
// auto raw_plan = raw_plan_tmp;
// raw_plan.replace(loc, 4, clause);
// auto plan_str = translate_text_plan_to_binary_plan(raw_plan.c_str());
// auto plan =
// milvus::query::CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size());
// BitsetType final;
// visitor.ExecuteExprNode(plan->plan_node_->filter_plannode_.value(),
// seg_promote,
// N * num_iters,
// final);
// EXPECT_EQ(final.size(), N * num_iters);
// for (int i = 0; i < N * num_iters; ++i) {
// auto ans = final[i];
// auto array = milvus::Array(array_cols[array_type][i]);
// ASSERT_EQ(ans, ref_func(array));
// }
// }
// }

View File

@ -227,7 +227,7 @@ TEST_P(GrowingTest, FillData) {
float_array_field, ids_ds->GetIds(), num_inserted);
auto vec_result =
segment->bulk_subscript(vec, ids_ds->GetIds(), num_inserted);
// checking result data
EXPECT_EQ(bool_result->scalars().bool_data().data_size(), num_inserted);
EXPECT_EQ(int8_result->scalars().int_data().data_size(), num_inserted);
EXPECT_EQ(int16_result->scalars().int_data().data_size(), num_inserted);

View File

@ -17,7 +17,7 @@
#include "index/IndexFactory.h"
#include "knowhere/version.h"
#include "segcore/SegmentSealedImpl.h"
#include "storage/ChunkCacheSingleton.h"
#include "storage/MmapManager.h"
#include "storage/MinioChunkManager.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/Util.h"
@ -1378,7 +1378,6 @@ TEST(Sealed, GetVectorFromChunkCache) {
auto metric_type = knowhere::metric::L2;
auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ;
auto mmap_dir = "/tmp/mmap";
auto file_name = std::string(
"sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000");
@ -1386,8 +1385,6 @@ TEST(Sealed, GetVectorFromChunkCache) {
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc);
auto mcm = std::make_unique<milvus::storage::MinioChunkManager>(sc);
// mcm->CreateBucket(sc.bucket_name);
milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir,
"willneed");
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField(
@ -1444,11 +1441,9 @@ TEST(Sealed, GetVectorFromChunkCache) {
std::vector<int64_t>{N},
false,
std::vector<std::string>{file_name}};
segment_sealed->AddFieldDataInfoForSealed(LoadFieldDataInfo{
std::map<int64_t, FieldBinlogInfo>{
{fakevec_id.get(), field_binlog_info}},
mmap_dir,
});
segment_sealed->AddFieldDataInfoForSealed(
LoadFieldDataInfo{std::map<int64_t, FieldBinlogInfo>{
{fakevec_id.get(), field_binlog_info}}});
auto segment = dynamic_cast<SegmentSealedImpl*>(segment_sealed.get());
auto has = segment->HasRawData(vec_info.field_id);
@ -1471,11 +1466,8 @@ TEST(Sealed, GetVectorFromChunkCache) {
}
rcm->Remove(file_name);
std::filesystem::remove_all(mmap_dir);
auto exist = rcm->Exist(file_name);
Assert(!exist);
exist = std::filesystem::exists(mmap_dir);
Assert(!exist);
}
TEST(Sealed, GetSparseVectorFromChunkCache) {
@ -1490,15 +1482,12 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
// we have a type of sparse index that doesn't include raw data.
auto index_type = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX;
auto mmap_dir = "/tmp/mmap";
auto file_name = std::string(
"sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000");
auto sc = milvus::storage::StorageConfig{};
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc);
auto mcm = std::make_unique<milvus::storage::MinioChunkManager>(sc);
milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir,
"willneed");
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField(
@ -1556,11 +1545,9 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
std::vector<int64_t>{N},
false,
std::vector<std::string>{file_name}};
segment_sealed->AddFieldDataInfoForSealed(LoadFieldDataInfo{
std::map<int64_t, FieldBinlogInfo>{
{fakevec_id.get(), field_binlog_info}},
mmap_dir,
});
segment_sealed->AddFieldDataInfoForSealed(
LoadFieldDataInfo{std::map<int64_t, FieldBinlogInfo>{
{fakevec_id.get(), field_binlog_info}}});
auto segment = dynamic_cast<SegmentSealedImpl*>(segment_sealed.get());
@ -1585,11 +1572,8 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
}
rcm->Remove(file_name);
std::filesystem::remove_all(mmap_dir);
auto exist = rcm->Exist(file_name);
Assert(!exist);
exist = std::filesystem::exists(mmap_dir);
Assert(!exist);
}
TEST(Sealed, WarmupChunkCache) {
@ -1609,9 +1593,6 @@ TEST(Sealed, WarmupChunkCache) {
auto sc = milvus::storage::StorageConfig{};
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc);
auto mcm = std::make_unique<milvus::storage::MinioChunkManager>(sc);
// mcm->CreateBucket(sc.bucket_name);
milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir,
"willneed");
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField(
@ -1668,11 +1649,9 @@ TEST(Sealed, WarmupChunkCache) {
std::vector<int64_t>{N},
false,
std::vector<std::string>{file_name}};
segment_sealed->AddFieldDataInfoForSealed(LoadFieldDataInfo{
std::map<int64_t, FieldBinlogInfo>{
{fakevec_id.get(), field_binlog_info}},
mmap_dir,
});
segment_sealed->AddFieldDataInfoForSealed(
LoadFieldDataInfo{std::map<int64_t, FieldBinlogInfo>{
{fakevec_id.get(), field_binlog_info}}});
auto segment = dynamic_cast<SegmentSealedImpl*>(segment_sealed.get());
auto has = segment->HasRawData(vec_info.field_id);

View File

@ -20,6 +20,7 @@
using namespace milvus;
namespace {
static constexpr int64_t seg_id = 101;
auto
generate_data(int N) {
std::vector<char> raw_data;

View File

@ -156,13 +156,3 @@ TEST_F(StorageTest, GetStorageMetrics) {
0, strncmp(currentLine, familyName.c_str(), familyName.length()));
}
}
TEST_F(StorageTest, CachePath) {
auto rcm =
RemoteChunkManagerSingleton::GetInstance().GetRemoteChunkManager();
auto cc_ = ChunkCache("tmp/mmap/chunk_cache", "willneed", rcm);
auto relative_result = cc_.CachePath("abc");
EXPECT_EQ("tmp/mmap/chunk_cache/abc", relative_result);
auto absolute_result = cc_.CachePath("/var/lib/milvus/abc");
EXPECT_EQ("tmp/mmap/chunk_cache/var/lib/milvus/abc", absolute_result);
}

View File

@ -62,7 +62,7 @@ TEST(Util, GetDeleteBitmap) {
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 10;
uint64_t seg_id = 101;
InsertRecord insert_record(*schema, N);
DeletedRecord delete_record;

View File

@ -32,6 +32,7 @@ using milvus::segcore::GeneratedData;
using milvus::storage::ChunkManagerPtr;
using milvus::storage::FieldDataMeta;
using milvus::storage::InsertData;
using milvus::storage::MmapConfig;
using milvus::storage::StorageConfig;
namespace {
@ -45,6 +46,18 @@ get_default_local_storage_config() {
return storage_config;
}
inline MmapConfig
get_default_mmap_config() {
MmapConfig mmap_config = {
.cache_read_ahead_policy = "willneed",
.mmap_path = "/tmp/test_mmap_manager/",
.disk_limit =
uint64_t(2) * uint64_t(1024) * uint64_t(1024) * uint64_t(1024),
.fix_file_size = uint64_t(4) * uint64_t(1024) * uint64_t(1024),
.growing_enable_mmap = false};
return mmap_config;
}
inline LoadFieldDataInfo
PrepareInsertBinlog(int64_t collection_id,
int64_t partition_id,

View File

@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -53,6 +54,7 @@ func (s *ManagerSuite) SetupSuite() {
},
},
}
initcore.InitMmapManager(paramtable.Get())
s.channelName = "by-dev-rootcoord-dml_0_100_v0"
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"path"
"path/filepath"
"strconv"
"testing"
"time"
@ -41,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/bloomfilter"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
@ -48,6 +50,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type DelegatorDataSuite struct {
@ -74,6 +77,9 @@ func (s *DelegatorDataSuite) SetupSuite() {
paramtable.Init()
paramtable.SetNodeID(1)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key, "1")
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
initcore.InitMmapManager(paramtable.Get())
s.collectionID = 1000
s.replicaID = 65535

View File

@ -2,6 +2,7 @@ package segments
import (
"context"
"path/filepath"
"testing"
"github.com/samber/lo"
@ -11,7 +12,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ManagerSuite struct {
@ -37,6 +40,9 @@ func (s *ManagerSuite) SetupSuite() {
s.channels = []string{"by-dev-rootcoord-dml_0_100v0", "by-dev-rootcoord-dml_1_200v0", "by-dev-rootcoord-dml_2_300v0", "by-dev-rootcoord-dml_3_400v0"}
s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed, SegmentTypeSealed}
s.levels = []datapb.SegmentLevel{datapb.SegmentLevel_Legacy, datapb.SegmentLevel_Legacy, datapb.SegmentLevel_L1, datapb.SegmentLevel_L0}
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
initcore.InitMmapManager(paramtable.Get())
}
func (s *ManagerSuite) SetupTest() {

View File

@ -3,6 +3,7 @@ package segments
import (
"context"
"fmt"
"path/filepath"
"testing"
"github.com/stretchr/testify/suite"
@ -13,6 +14,7 @@ import (
storage "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SegmentSuite struct {
@ -43,6 +45,9 @@ func (suite *SegmentSuite) SetupTest() {
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
initcore.InitRemoteChunkManager(paramtable.Get())
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
initcore.InitMmapManager(paramtable.Get())
suite.collectionID = 100
suite.partitionID = 10

View File

@ -249,21 +249,10 @@ func (node *QueryNode) InitSegcore() error {
return err
}
mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
if len(mmapDirPath) == 0 {
paramtable.Get().Save(
paramtable.Get().QueryNodeCfg.MmapDirPath.Key,
path.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), "mmap"),
)
mmapDirPath = paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
}
chunkCachePath := path.Join(mmapDirPath, "chunk_cache")
policy := paramtable.Get().QueryNodeCfg.ReadAheadPolicy.GetValue()
err = initcore.InitChunkCache(chunkCachePath, policy)
err = initcore.InitMmapManager(paramtable.Get())
if err != nil {
return err
}
log.Info("InitChunkCache done", zap.String("dir", chunkCachePath), zap.String("policy", policy))
initcore.InitTraceConfig(paramtable.Get())
return nil
@ -406,6 +395,7 @@ func (node *QueryNode) Start() error {
paramtable.SetCreateTime(time.Now())
paramtable.SetUpdateTime(time.Now())
mmapEnabled := paramtable.Get().QueryNodeCfg.MmapEnabled.GetAsBool()
growingmmapEnable := paramtable.Get().QueryNodeCfg.GrowingMmapEnabled.GetAsBool()
node.UpdateStateCode(commonpb.StateCode_Healthy)
registry.GetInMemoryResolver().RegisterQueryNode(node.GetNodeID(), node)
@ -413,6 +403,7 @@ func (node *QueryNode) Start() error {
zap.Int64("queryNodeID", node.GetNodeID()),
zap.String("Address", node.address),
zap.Bool("mmapEnabled", mmapEnabled),
zap.Bool("growingmmapEnable", growingmmapEnable),
)
})

View File

@ -29,6 +29,7 @@ import "C"
import (
"fmt"
"path"
"time"
"unsafe"
@ -160,13 +161,31 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
return HandleCStatus(&status, "InitRemoteChunkManagerSingleton failed")
}
func InitChunkCache(mmapDirPath string, readAheadPolicy string) error {
cMmapDirPath := C.CString(mmapDirPath)
defer C.free(unsafe.Pointer(cMmapDirPath))
cReadAheadPolicy := C.CString(readAheadPolicy)
defer C.free(unsafe.Pointer(cReadAheadPolicy))
status := C.InitChunkCacheSingleton(cMmapDirPath, cReadAheadPolicy)
return HandleCStatus(&status, "InitChunkCacheSingleton failed")
func InitMmapManager(params *paramtable.ComponentParam) error {
mmapDirPath := params.QueryNodeCfg.MmapDirPath.GetValue()
if len(mmapDirPath) == 0 {
paramtable.Get().Save(
paramtable.Get().QueryNodeCfg.MmapDirPath.Key,
path.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), "mmap"),
)
mmapDirPath = paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
}
cMmapChunkManagerDir := C.CString(path.Join(mmapDirPath, "/mmap_chunk_manager/"))
cCacheReadAheadPolicy := C.CString(params.QueryNodeCfg.ReadAheadPolicy.GetValue())
defer C.free(unsafe.Pointer(cMmapChunkManagerDir))
defer C.free(unsafe.Pointer(cCacheReadAheadPolicy))
diskCapacity := params.QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
diskLimit := uint64(float64(params.QueryNodeCfg.MaxMmapDiskPercentageForMmapManager.GetAsUint64()*diskCapacity) * 0.01)
mmapFileSize := params.QueryNodeCfg.FixedFileSizeForMmapManager.GetAsUint64() * 1024 * 1024
mmapConfig := C.CMmapConfig{
cache_read_ahead_policy: cCacheReadAheadPolicy,
mmap_path: cMmapChunkManagerDir,
disk_limit: C.uint64_t(diskLimit),
fix_file_size: C.uint64_t(mmapFileSize),
growing_enable_mmap: C.bool(params.QueryNodeCfg.GrowingMmapEnabled.GetAsBool()),
}
status := C.InitMmapManager(mmapConfig)
return HandleCStatus(&status, "InitMmapManager failed")
}
func CleanRemoteChunkManager() {

View File

@ -2122,6 +2122,9 @@ type queryNodeConfig struct {
CacheMemoryLimit ParamItem `refreshable:"false"`
MmapDirPath ParamItem `refreshable:"false"`
MmapEnabled ParamItem `refreshable:"false"`
GrowingMmapEnabled ParamItem `refreshable:"false"`
FixedFileSizeForMmapManager ParamItem `refreshable:"false"`
MaxMmapDiskPercentageForMmapManager ParamItem `refreshable:"false"`
LazyLoadEnabled ParamItem `refreshable:"false"`
LazyLoadWaitTimeout ParamItem `refreshable:"true"`
@ -2376,6 +2379,38 @@ func (p *queryNodeConfig) init(base *BaseTable) {
}
p.MmapEnabled.Init(base.mgr)
p.GrowingMmapEnabled = ParamItem{
Key: "queryNode.mmap.growingMmapEnabled",
Version: "2.4.4",
DefaultValue: "false",
FallbackKeys: []string{"queryNode.growingMmapEnabled"},
Doc: "Enable mmap for using in growing raw data",
Export: true,
Formatter: func(v string) string {
mmapEnabled := p.MmapEnabled.GetAsBool()
return strconv.FormatBool(mmapEnabled && getAsBool(v))
},
}
p.GrowingMmapEnabled.Init(base.mgr)
p.FixedFileSizeForMmapManager = ParamItem{
Key: "queryNode.mmap.fixedFileSizeForMmapAlloc",
Version: "2.4.0",
DefaultValue: "64",
Doc: "tmp file size for mmap chunk manager",
Export: true,
}
p.FixedFileSizeForMmapManager.Init(base.mgr)
p.MaxMmapDiskPercentageForMmapManager = ParamItem{
Key: "querynode.mmap.maxDiskUsagePercentageForMmapAlloc",
Version: "2.4.0",
DefaultValue: "20",
Doc: "disk percentage used in mmap chunk manager",
Export: true,
}
p.MaxMmapDiskPercentageForMmapManager.Init(base.mgr)
p.LazyLoadEnabled = ParamItem{
Key: "queryNode.lazyload.enabled",
Version: "2.4.2",