mirror of https://github.com/milvus-io/milvus.git
fix: fix broken Sparse Float Vector raw data mmap (#36183)
issue: https://github.com/milvus-io/milvus/issues/36182 * improved `Column.h` to make the code much more readable and maintainable, and added detailed comments. * fixed an issue where `ArrayColumn::NumRows()` always returns 0 when the mmap backing storage is a file. * removed unused `ColumnBase` constructors and unnecessary members so we don't get confused. * Updated `test_chunk_cache.cpp` to make the tests parameterized: to test both mmap enabled and disabled. Added sparse field in the test to add coverage. * re-enabled test `Sealed::GetSparseVectorFromChunkCache`. * But 2 other disabled tests `Sealed::WarmupChunkCache` and `Sealed::GetVectorFromChunkCache` remain disabled, there seems to be errors. @bigsheeper PTAL. --------- Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>pull/36522/head
parent
58baeee8f1
commit
8495bc6bbc
|
@ -128,10 +128,10 @@ GetDataTypeSize(DataType data_type, int dim = 1) {
|
|||
case DataType::VECTOR_BFLOAT16: {
|
||||
return sizeof(bfloat16) * dim;
|
||||
}
|
||||
// Not supporting VECTOR_SPARSE_FLOAT here intentionally. We can't
|
||||
// easily estimately the size of a sparse float vector. Caller of this
|
||||
// method must handle this case themselves and must not pass
|
||||
// VECTOR_SPARSE_FLOAT data_type.
|
||||
// Not supporting variable length types(such as VECTOR_SPARSE_FLOAT and
|
||||
// VARCHAR) here intentionally. We can't easily estimate the size of
|
||||
// them. Caller of this method must handle this case themselves and must
|
||||
// not pass variable length types to this method.
|
||||
default: {
|
||||
PanicInfo(
|
||||
DataTypeInvalid,
|
||||
|
|
|
@ -45,43 +45,104 @@
|
|||
|
||||
namespace milvus {
|
||||
|
||||
/*
|
||||
* If string field's value all empty, need a string padding to avoid
|
||||
* mmap failing because size_ is zero which causing invalid arguement
|
||||
* array has the same problem
|
||||
* TODO: remove it when support NULL value
|
||||
*/
|
||||
constexpr size_t STRING_PADDING = 1;
|
||||
constexpr size_t ARRAY_PADDING = 1;
|
||||
|
||||
constexpr size_t DEFAULT_PK_VRCOL_BLOCK_SIZE = 1;
|
||||
constexpr size_t DEFAULT_MEM_VRCOL_BLOCK_SIZE = 32;
|
||||
constexpr size_t DEFAULT_MMAP_VRCOL_BLOCK_SIZE = 256;
|
||||
|
||||
/**
|
||||
* ColumnBase and its subclasses are designed to store and retrieve the raw data
|
||||
* of a field.
|
||||
*
|
||||
* It has 3 types of constructors corresponding to 3 MappingTypes:
|
||||
*
|
||||
* 1. MAP_WITH_ANONYMOUS: ColumnBase(size_t reserve_size, const FieldMeta& field_meta)
|
||||
*
|
||||
* This is used when we store the entire data in memory. Upon return, a piece
|
||||
* of unwritten memory is allocated and the caller can fill the memory with data by
|
||||
* calling AppendBatch/Append.
|
||||
*
|
||||
* 2. MAP_WITH_FILE: ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
*
|
||||
* This is used when the raw data has already been written into a file, and we
|
||||
* simply mmap the file to memory and interpret the memory as a column. In this
|
||||
* mode, since the data is already in the file/mmapped memory, calling AppendBatch
|
||||
* and Append is not allowed.
|
||||
*
|
||||
* 3. MAP_WITH_MANAGER: ColumnBase(size_t reserve,
|
||||
* const DataType& data_type,
|
||||
* storage::MmapChunkManagerPtr mcm,
|
||||
* storage::MmapChunkDescriptorPtr descriptor,
|
||||
* bool nullable)
|
||||
*
|
||||
* This is used when we want to mmap but don't want to download all the data at once.
|
||||
* Instead, we download the data in chunks, cache and mmap each chunk as a single
|
||||
* ColumnBase. Upon return, a piece of unwritten mmaped memory is allocated by the chunk
|
||||
* manager, and the caller should fill the memory with data by calling AppendBatch
|
||||
* and Append.
|
||||
*
|
||||
* - Types with fixed length can use the Column subclass directly.
|
||||
* - Types with variable lengths:
|
||||
* - SparseFloatColumn:
|
||||
* - To store sparse float vectors.
|
||||
* - All 3 modes are supported.
|
||||
* - VariableColumn:
|
||||
* - To store string like types such as VARCHAR and JSON.
|
||||
* - MAP_WITH_MANAGER is not yet supported(as of 2024.09.11).
|
||||
* - ArrayColumn:
|
||||
* - To store ARRAY types.
|
||||
* - MAP_WITH_MANAGER is not yet supported(as of 2024.09.11).
|
||||
*
|
||||
*/
|
||||
class ColumnBase {
|
||||
/**
|
||||
* - data_ points at a piece of memory of size data_cap_size_ + padding_.
|
||||
* Based on mapping_type_, such memory can be:
|
||||
* - an anonymous memory region, allocated by mmap(MAP_ANON)
|
||||
* - a file-backed memory region, mapped by mmap(MAP_FILE)
|
||||
* - a memory region managed by MmapChunkManager, allocated by
|
||||
* MmapChunkManager::Allocate()
|
||||
*
|
||||
* Memory Layout of `data_`:
|
||||
*
|
||||
* |<-- data_cap_size_ -->|<-- padding_ -->|
|
||||
* |<-- data_size_ -->|<-- free space -->|
|
||||
*
|
||||
* AppendBatch/Append should first check if there's enough space for new data.
|
||||
* If not, call ExpandData() to expand the space.
|
||||
*
|
||||
* - only the first data_cap_size_ bytes can be used to store actual data.
|
||||
* - padding at the end is to ensure when all values are empty, we don't try
|
||||
* to allocate/mmap 0 bytes memory, which will cause mmap() to fail.
|
||||
* - data_size_ is the number of bytes currently used to store actual data.
|
||||
* - num_rows_ is the number of rows currently stored.
|
||||
* - valid_data_ is a FixedVector<bool> indicating whether each element is
|
||||
* not null. it is only used when nullable is true.
|
||||
* - nullable_ is true if null(0 byte) is a valid value for the column.
|
||||
*
|
||||
*/
|
||||
public:
|
||||
enum MappingType {
|
||||
enum class MappingType {
|
||||
MAP_WITH_ANONYMOUS = 0,
|
||||
MAP_WITH_FILE = 1,
|
||||
MAP_WITH_MANAGER = 2,
|
||||
};
|
||||
// memory mode ctor
|
||||
ColumnBase(size_t reserve, const FieldMeta& field_meta)
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
ColumnBase(size_t reserve_rows, const FieldMeta& field_meta)
|
||||
: mapping_type_(MappingType::MAP_WITH_ANONYMOUS) {
|
||||
auto data_type = field_meta.get_data_type();
|
||||
SetPaddingSize(data_type);
|
||||
|
||||
if (field_meta.is_nullable()) {
|
||||
nullable_ = true;
|
||||
valid_data_.reserve(reserve_rows);
|
||||
}
|
||||
// We don't pre-allocate memory for variable length data type, data_
|
||||
// will be allocated by ExpandData() when AppendBatch/Append is called.
|
||||
if (IsVariableDataType(data_type)) {
|
||||
if (field_meta.is_nullable()) {
|
||||
nullable_ = true;
|
||||
valid_data_.reserve(reserve);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
type_size_ = field_meta.get_sizeof();
|
||||
|
||||
data_cap_size_ = field_meta.get_sizeof() * reserve;
|
||||
data_cap_size_ = field_meta.get_sizeof() * reserve_rows;
|
||||
|
||||
// use anon mapping so we are able to free these memory with munmap only
|
||||
size_t mapped_size = data_cap_size_ + padding_;
|
||||
|
@ -95,28 +156,22 @@ class ColumnBase {
|
|||
"failed to create anon map: {}, map_size={}",
|
||||
strerror(errno),
|
||||
mapped_size);
|
||||
|
||||
if (field_meta.is_nullable()) {
|
||||
nullable_ = true;
|
||||
valid_data_.reserve(reserve);
|
||||
}
|
||||
UpdateMetricWhenMmap(mapped_size);
|
||||
}
|
||||
|
||||
// use mmap manager ctor, used in growing segment fixed data type
|
||||
// MAP_WITH_MANAGER ctor
|
||||
// reserve is number of bytes to allocate(without padding)
|
||||
ColumnBase(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor,
|
||||
bool nullable)
|
||||
: mcm_(mcm),
|
||||
mmap_descriptor_(descriptor),
|
||||
type_size_(GetDataTypeSize(data_type, dim)),
|
||||
num_rows_(0),
|
||||
data_size_(0),
|
||||
data_cap_size_(reserve),
|
||||
mapping_type_(MAP_WITH_MANAGER),
|
||||
mapping_type_(MappingType::MAP_WITH_MANAGER),
|
||||
nullable_(nullable) {
|
||||
AssertInfo((mcm != nullptr) && descriptor != nullptr,
|
||||
"use wrong mmap chunk manager and mmap chunk descriptor to "
|
||||
|
@ -133,24 +188,30 @@ class ColumnBase {
|
|||
}
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// User must call Seal to build the view for variable length column.
|
||||
// !!! The incoming file must be write padings at the end of the file.
|
||||
// MAP_WITH_FILE ctor
|
||||
// size is number of bytes of the file, with padding
|
||||
// !!! The incoming file must have padding written at the end of the file.
|
||||
// Subclasses of variable length data type, if they used this constructor,
|
||||
// must set num_rows_ by themselves.
|
||||
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
: mapping_type_(MappingType::MAP_WITH_FILE) {
|
||||
: nullable_(field_meta.is_nullable()),
|
||||
mapping_type_(MappingType::MAP_WITH_FILE) {
|
||||
auto data_type = field_meta.get_data_type();
|
||||
SetPaddingSize(data_type);
|
||||
|
||||
if (!IsVariableDataType(data_type)) {
|
||||
type_size_ = field_meta.get_sizeof();
|
||||
num_rows_ = size / type_size_;
|
||||
auto type_size = field_meta.get_sizeof();
|
||||
num_rows_ = size / type_size;
|
||||
}
|
||||
AssertInfo(size >= padding_,
|
||||
"file size {} is less than padding size {}",
|
||||
size,
|
||||
padding_);
|
||||
|
||||
data_size_ = size;
|
||||
data_cap_size_ = size - padding_;
|
||||
// in MAP_WITH_FILE, no extra space written in file, so data_size_ is
|
||||
// the same as data_cap_size_.
|
||||
data_size_ = size - padding_;
|
||||
data_cap_size_ = data_size_;
|
||||
// use exactly same size of file, padding shall be written in file already
|
||||
// see also https://github.com/milvus-io/milvus/issues/34442
|
||||
data_ = static_cast<char*>(
|
||||
|
@ -161,47 +222,7 @@ class ColumnBase {
|
|||
madvise(data_, size, MADV_WILLNEED);
|
||||
|
||||
// valid_data store in memory
|
||||
if (field_meta.is_nullable()) {
|
||||
nullable_ = true;
|
||||
valid_data_.reserve(num_rows_);
|
||||
}
|
||||
|
||||
UpdateMetricWhenMmap(size);
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// User must call Seal to build the view for variable length column.
|
||||
// !!! The incoming file must be write padings at the end of the file.
|
||||
ColumnBase(const File& file,
|
||||
size_t size,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
bool nullable)
|
||||
: data_size_(size),
|
||||
nullable_(nullable),
|
||||
mapping_type_(MappingType::MAP_WITH_FILE) {
|
||||
SetPaddingSize(data_type);
|
||||
|
||||
// use exact same size of file, padding shall be written in file already
|
||||
// see also https://github.com/milvus-io/milvus/issues/34442
|
||||
if (!IsVariableDataType(data_type)) {
|
||||
type_size_ = GetDataTypeSize(data_type, dim);
|
||||
num_rows_ = size / type_size_;
|
||||
}
|
||||
AssertInfo(size >= padding_,
|
||||
"file size {} is less than padding size {}",
|
||||
size,
|
||||
padding_);
|
||||
|
||||
data_cap_size_ = size - padding_;
|
||||
|
||||
data_ = static_cast<char*>(
|
||||
mmap(nullptr, size, PROT_READ, MAP_SHARED, file.Descriptor(), 0));
|
||||
AssertInfo(data_ != MAP_FAILED,
|
||||
"failed to create file-backed map, err: {}",
|
||||
strerror(errno));
|
||||
|
||||
if (nullable) {
|
||||
if (nullable_) {
|
||||
valid_data_.reserve(num_rows_);
|
||||
}
|
||||
|
||||
|
@ -210,36 +231,22 @@ class ColumnBase {
|
|||
|
||||
virtual ~ColumnBase() {
|
||||
if (data_ != nullptr) {
|
||||
size_t mapped_size = data_cap_size_ + padding_;
|
||||
if (mapping_type_ != MappingType::MAP_WITH_MANAGER) {
|
||||
size_t mapped_size = data_cap_size_ + padding_;
|
||||
if (munmap(data_, mapped_size)) {
|
||||
AssertInfo(true,
|
||||
"failed to unmap variable field, err={}",
|
||||
strerror(errno));
|
||||
}
|
||||
}
|
||||
UpdateMetricWhenMunmap(data_cap_size_ + padding_);
|
||||
UpdateMetricWhenMunmap(mapped_size);
|
||||
}
|
||||
if (nullable_) {
|
||||
valid_data_.clear();
|
||||
}
|
||||
}
|
||||
|
||||
ColumnBase(ColumnBase&& column) noexcept
|
||||
: data_(column.data_),
|
||||
nullable_(column.nullable_),
|
||||
valid_data_(std::move(column.valid_data_)),
|
||||
padding_(column.padding_),
|
||||
type_size_(column.type_size_),
|
||||
num_rows_(column.num_rows_),
|
||||
data_size_(column.data_size_) {
|
||||
column.data_ = nullptr;
|
||||
column.data_cap_size_ = 0;
|
||||
column.padding_ = 0;
|
||||
column.num_rows_ = 0;
|
||||
column.data_size_ = 0;
|
||||
column.nullable_ = false;
|
||||
}
|
||||
ColumnBase(ColumnBase&&) = delete;
|
||||
|
||||
// Data() points at an addr that contains the elements
|
||||
virtual const char*
|
||||
|
@ -266,27 +273,21 @@ class ColumnBase {
|
|||
return nullable_;
|
||||
}
|
||||
|
||||
size_t
|
||||
DataSize() const {
|
||||
return data_size_;
|
||||
}
|
||||
|
||||
size_t
|
||||
NumRows() const {
|
||||
return num_rows_;
|
||||
};
|
||||
|
||||
virtual size_t
|
||||
ByteSize() const {
|
||||
// folly::fbvector<bool> implemented with bit compression.
|
||||
return data_cap_size_ + padding_ + (valid_data_.size() + 7) / 8;
|
||||
// returns the number of bytes used to store actual data
|
||||
size_t
|
||||
DataByteSize() const {
|
||||
return data_size_;
|
||||
}
|
||||
|
||||
// The capacity of the column,
|
||||
// DO NOT call this for variable length column(including SparseFloatColumn).
|
||||
virtual size_t
|
||||
Capacity() const {
|
||||
return data_cap_size_ / type_size_;
|
||||
// returns the ballpark number of bytes used by this object
|
||||
size_t
|
||||
MemoryUsageBytes() const {
|
||||
return data_cap_size_ + padding_ + (valid_data_.size() + 7) / 8;
|
||||
}
|
||||
|
||||
virtual SpanBase
|
||||
|
@ -309,7 +310,7 @@ class ColumnBase {
|
|||
AppendBatch(const FieldDataPtr data) {
|
||||
size_t required_size = data_size_ + data->DataSize();
|
||||
if (required_size > data_cap_size_) {
|
||||
ExpandData(required_size * 2 + padding_);
|
||||
ExpandData(required_size * 2);
|
||||
}
|
||||
|
||||
std::copy_n(static_cast<const char*>(data->Data()),
|
||||
|
@ -359,18 +360,13 @@ class ColumnBase {
|
|||
num_rows_++;
|
||||
}
|
||||
|
||||
void
|
||||
SetPaddingSize(const DataType& type) {
|
||||
padding_ = PaddingSize(type);
|
||||
}
|
||||
|
||||
void
|
||||
SetValidData(FixedVector<bool>&& valid_data) {
|
||||
valid_data_ = std::move(valid_data);
|
||||
}
|
||||
|
||||
protected:
|
||||
// only for memory mode and mmap manager mode, not mmap
|
||||
// new_size should not include padding, padding will be added in ExpandData()
|
||||
void
|
||||
ExpandData(size_t new_size) {
|
||||
if (new_size == 0) {
|
||||
|
@ -380,15 +376,15 @@ class ColumnBase {
|
|||
mapping_type_ == MappingType::MAP_WITH_ANONYMOUS ||
|
||||
mapping_type_ == MappingType::MAP_WITH_MANAGER,
|
||||
"expand function only use in anonymous or with mmap manager");
|
||||
size_t new_mapped_size = new_size + padding_;
|
||||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
size_t new_mapped_size = new_size + padding_;
|
||||
auto data = static_cast<char*>(mmap(nullptr,
|
||||
new_mapped_size,
|
||||
PROT_READ | PROT_WRITE,
|
||||
MAP_PRIVATE | MAP_ANON,
|
||||
-1,
|
||||
0));
|
||||
UpdateMetricWhenMmap(true, new_mapped_size);
|
||||
UpdateMetricWhenMmap(new_mapped_size);
|
||||
|
||||
AssertInfo(data != MAP_FAILED,
|
||||
"failed to expand map: {}, new_map_size={}",
|
||||
|
@ -403,6 +399,11 @@ class ColumnBase {
|
|||
munmap(data, mapped_size);
|
||||
UpdateMetricWhenMunmap(mapped_size);
|
||||
|
||||
// TODO: error handling is problematic:
|
||||
// if munmap fails, exception will be thrown and caught by
|
||||
// the cgo call, but the program continue to run. and the
|
||||
// successfully newly mmaped data will not be assigned to data_
|
||||
// and got leaked.
|
||||
AssertInfo(
|
||||
false,
|
||||
"failed to unmap while expanding: {}, old_map_size={}",
|
||||
|
@ -414,9 +415,7 @@ class ColumnBase {
|
|||
|
||||
data_ = data;
|
||||
data_cap_size_ = new_size;
|
||||
mapping_type_ = MappingType::MAP_WITH_ANONYMOUS;
|
||||
} else if (mapping_type_ == MappingType::MAP_WITH_MANAGER) {
|
||||
size_t new_mapped_size = new_size + padding_;
|
||||
auto data = mcm_->Allocate(mmap_descriptor_, new_mapped_size);
|
||||
AssertInfo(data != nullptr,
|
||||
"fail to create with mmap manager: map_size = {}",
|
||||
|
@ -425,7 +424,6 @@ class ColumnBase {
|
|||
// allocate space only append in one growing segment, so no need to munmap()
|
||||
data_ = (char*)data;
|
||||
data_cap_size_ = new_size;
|
||||
mapping_type_ = MappingType::MAP_WITH_MANAGER;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -435,36 +433,36 @@ class ColumnBase {
|
|||
// for the reason that, FixedVector<bool> use bit granularity for storage and access
|
||||
// so FixedVector is also used to store valid_data on the sealed segment.
|
||||
FixedVector<bool> valid_data_;
|
||||
// capacity in bytes
|
||||
size_t data_cap_size_{0};
|
||||
size_t padding_{0};
|
||||
// type_size_ is not used for sparse float vector column.
|
||||
size_t type_size_{1};
|
||||
size_t num_rows_{0};
|
||||
|
||||
// length in bytes
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
size_t data_size_{0};
|
||||
const MappingType mapping_type_;
|
||||
|
||||
private:
|
||||
void
|
||||
UpdateMetricWhenMmap(size_t mmaped_size) {
|
||||
UpdateMetricWhenMmap(mapping_type_, mmaped_size);
|
||||
SetPaddingSize(const DataType& type) {
|
||||
padding_ = PaddingSize(type);
|
||||
}
|
||||
|
||||
void
|
||||
UpdateMetricWhenMmap(bool is_map_anonymous, size_t mapped_size) {
|
||||
UpdateMetricWhenMmap(size_t mapped_size) {
|
||||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
milvus::monitor::internal_mmap_allocated_space_bytes_anon.Observe(
|
||||
mapped_size);
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_anon.Increment(
|
||||
mapped_size);
|
||||
} else {
|
||||
} else if (mapping_type_ == MappingType::MAP_WITH_FILE) {
|
||||
milvus::monitor::internal_mmap_allocated_space_bytes_file.Observe(
|
||||
mapped_size);
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Increment(
|
||||
mapped_size);
|
||||
}
|
||||
// else: does not update metric for MAP_WITH_MANAGER, MmapChunkManagerPtr
|
||||
// will update metric itself.
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -472,49 +470,36 @@ class ColumnBase {
|
|||
if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) {
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_anon.Decrement(
|
||||
mapped_size);
|
||||
} else {
|
||||
} else if (mapping_type_ == MappingType::MAP_WITH_FILE) {
|
||||
milvus::monitor::internal_mmap_in_used_space_bytes_file.Decrement(
|
||||
mapped_size);
|
||||
}
|
||||
// else: does not update metric for MAP_WITH_MANAGER, MmapChunkManagerPtr
|
||||
// will update metric itself.
|
||||
}
|
||||
|
||||
private:
|
||||
// mapping_type_
|
||||
MappingType mapping_type_;
|
||||
storage::MmapChunkManagerPtr mcm_ = nullptr;
|
||||
};
|
||||
|
||||
class Column : public ColumnBase {
|
||||
public:
|
||||
// memory mode ctor
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
Column(size_t cap, const FieldMeta& field_meta)
|
||||
: ColumnBase(cap, field_meta) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// MAP_WITH_FILE ctor
|
||||
Column(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
: ColumnBase(file, size, field_meta) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
Column(const File& file,
|
||||
size_t size,
|
||||
int dim,
|
||||
DataType data_type,
|
||||
bool nullable)
|
||||
: ColumnBase(file, size, dim, data_type, nullable) {
|
||||
}
|
||||
|
||||
// MAP_WITH_MANAGER ctor
|
||||
Column(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor,
|
||||
bool nullable)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor, nullable) {
|
||||
}
|
||||
|
||||
Column(Column&& column) noexcept : ColumnBase(std::move(column)) {
|
||||
: ColumnBase(reserve, data_type, mcm, descriptor, nullable) {
|
||||
}
|
||||
|
||||
~Column() override = default;
|
||||
|
@ -526,82 +511,104 @@ class Column : public ColumnBase {
|
|||
}
|
||||
};
|
||||
|
||||
// when mmap is used, size_, data_ and num_rows_ of ColumnBase are used.
|
||||
class SparseFloatColumn : public ColumnBase {
|
||||
public:
|
||||
// memory mode ctor
|
||||
SparseFloatColumn(const FieldMeta& field_meta) : ColumnBase(0, field_meta) {
|
||||
}
|
||||
// mmap mode ctor
|
||||
SparseFloatColumn(const File& file,
|
||||
size_t size,
|
||||
const FieldMeta& field_meta)
|
||||
: ColumnBase(file, size, field_meta) {
|
||||
}
|
||||
// mmap mode ctor
|
||||
SparseFloatColumn(const File& file,
|
||||
size_t size,
|
||||
int dim,
|
||||
const DataType& data_type)
|
||||
: ColumnBase(file, size, dim, data_type, false) {
|
||||
}
|
||||
// mmap with mmap manager
|
||||
SparseFloatColumn(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor, false) {
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
SparseFloatColumn(const FieldMeta& field_meta)
|
||||
: ColumnBase(/*reserve_rows= */ 0, field_meta) {
|
||||
}
|
||||
|
||||
SparseFloatColumn(SparseFloatColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)),
|
||||
dim_(column.dim_),
|
||||
vec_(std::move(column.vec_)) {
|
||||
// MAP_WITH_FILE ctor
|
||||
SparseFloatColumn(const File& file,
|
||||
size_t size,
|
||||
const FieldMeta& field_meta,
|
||||
std::vector<uint64_t>&& indices = {})
|
||||
: ColumnBase(file, size, field_meta) {
|
||||
AssertInfo(!indices.empty(),
|
||||
"SparseFloatColumn indices should not be empty.");
|
||||
num_rows_ = indices.size();
|
||||
// so that indices[num_rows_] - indices[num_rows_ - 1] is the byte size of
|
||||
// the last row.
|
||||
indices.push_back(data_size_);
|
||||
dim_ = 0;
|
||||
for (size_t i = 0; i < num_rows_; i++) {
|
||||
auto vec_size = indices[i + 1] - indices[i];
|
||||
AssertInfo(
|
||||
vec_size % knowhere::sparse::SparseRow<float>::element_size() ==
|
||||
0,
|
||||
"Incorrect sparse vector byte size: {}",
|
||||
vec_size);
|
||||
vec_.emplace_back(
|
||||
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
|
||||
(uint8_t*)(data_) + indices[i],
|
||||
false);
|
||||
dim_ = std::max(dim_, vec_.back().dim());
|
||||
}
|
||||
}
|
||||
|
||||
// MAP_WITH_MANAGER ctor
|
||||
SparseFloatColumn(storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(/*reserve= */ 0,
|
||||
DataType::VECTOR_SPARSE_FLOAT,
|
||||
mcm,
|
||||
descriptor,
|
||||
false) {
|
||||
}
|
||||
|
||||
~SparseFloatColumn() override = default;
|
||||
|
||||
// returned pointer points at a list of knowhere::sparse::SparseRow<float>
|
||||
const char*
|
||||
Data() const override {
|
||||
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
|
||||
}
|
||||
|
||||
// This is used to advice mmap prefetch, we don't currently support mmap for
|
||||
// sparse float vector thus not implemented for now.
|
||||
size_t
|
||||
ByteSize() const override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"ByteSize not supported for sparse float column");
|
||||
}
|
||||
|
||||
size_t
|
||||
Capacity() const override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"Capacity not supported for sparse float column");
|
||||
}
|
||||
|
||||
SpanBase
|
||||
Span() const override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"Span not supported for sparse float column");
|
||||
"SparseFloatColumn::Span() not supported");
|
||||
}
|
||||
|
||||
void
|
||||
AppendBatch(const FieldDataPtr data) override {
|
||||
AssertInfo(
|
||||
mapping_type_ != MappingType::MAP_WITH_FILE,
|
||||
"SparseFloatColumn::AppendBatch not supported for MAP_WITH_FILE");
|
||||
|
||||
size_t required_size = data_size_ + data->DataSize();
|
||||
if (required_size > data_cap_size_) {
|
||||
ExpandData(required_size * 2);
|
||||
}
|
||||
dim_ = std::max(
|
||||
dim_,
|
||||
std::static_pointer_cast<FieldDataSparseVectorImpl>(data)->Dim());
|
||||
|
||||
auto ptr = static_cast<const knowhere::sparse::SparseRow<float>*>(
|
||||
data->Data());
|
||||
vec_.insert(vec_.end(), ptr, ptr + data->Length());
|
||||
|
||||
for (size_t i = 0; i < data->Length(); ++i) {
|
||||
dim_ = std::max(dim_, ptr[i].dim());
|
||||
auto row_bytes = ptr[i].data_byte_size();
|
||||
std::memcpy(data_ + data_size_, ptr[i].data(), row_bytes);
|
||||
vec_.emplace_back(
|
||||
ptr[i].size(), (uint8_t*)(data_) + data_size_, false);
|
||||
data_size_ += row_bytes;
|
||||
}
|
||||
num_rows_ += data->Length();
|
||||
}
|
||||
|
||||
void
|
||||
Append(const char* data, size_t size) override {
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"Append not supported for sparse float column");
|
||||
PanicInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"SparseFloatColumn::Append not supported, use AppendBatch instead");
|
||||
}
|
||||
|
||||
void
|
||||
Append(const char* data, const bool valid_data, size_t size) override {
|
||||
PanicInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"SparseFloatColumn::Append not supported, use AppendBatch instead");
|
||||
}
|
||||
|
||||
int64_t
|
||||
|
@ -609,34 +616,6 @@ class SparseFloatColumn : public ColumnBase {
|
|||
return dim_;
|
||||
}
|
||||
|
||||
void
|
||||
Seal(std::vector<uint64_t> indices) {
|
||||
AssertInfo(!indices.empty(),
|
||||
"indices should not be empty, Seal() of "
|
||||
"SparseFloatColumn must be called only "
|
||||
"at mmap mode");
|
||||
AssertInfo(data_,
|
||||
"data_ should not be nullptr, Seal() of "
|
||||
"SparseFloatColumn must be called only "
|
||||
"at mmap mode");
|
||||
num_rows_ = indices.size();
|
||||
// so that indices[num_rows_] - indices[num_rows_ - 1] is the size of
|
||||
// the last row.
|
||||
indices.push_back(data_size_);
|
||||
for (size_t i = 0; i < num_rows_; i++) {
|
||||
auto vec_size = indices[i + 1] - indices[i];
|
||||
AssertInfo(
|
||||
vec_size % knowhere::sparse::SparseRow<float>::element_size() ==
|
||||
0,
|
||||
"Incorrect sparse vector size: {}",
|
||||
vec_size);
|
||||
vec_.emplace_back(
|
||||
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
|
||||
(uint8_t*)(data_) + indices[i],
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
int64_t dim_ = 0;
|
||||
std::vector<knowhere::sparse::SparseRow<float>> vec_;
|
||||
|
@ -648,33 +627,20 @@ class VariableColumn : public ColumnBase {
|
|||
using ViewType =
|
||||
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
|
||||
|
||||
// memory mode ctor
|
||||
VariableColumn(size_t cap, const FieldMeta& field_meta, size_t block_size)
|
||||
: ColumnBase(cap, field_meta), block_size_(block_size) {
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
VariableColumn(size_t reserve_rows,
|
||||
const FieldMeta& field_meta,
|
||||
size_t block_size)
|
||||
: ColumnBase(reserve_rows, field_meta), block_size_(block_size) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// MAP_WITH_FILE ctor
|
||||
VariableColumn(const File& file,
|
||||
size_t size,
|
||||
const FieldMeta& field_meta,
|
||||
size_t block_size)
|
||||
: ColumnBase(file, size, field_meta), block_size_(block_size) {
|
||||
}
|
||||
// mmap with mmap manager
|
||||
VariableColumn(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor,
|
||||
bool nullable,
|
||||
size_t block_size)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor, nullable),
|
||||
block_size_(block_size) {
|
||||
}
|
||||
|
||||
VariableColumn(VariableColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)), indices_(std::move(column.indices_)) {
|
||||
}
|
||||
|
||||
~VariableColumn() override = default;
|
||||
|
||||
|
@ -832,34 +798,18 @@ class VariableColumn : public ColumnBase {
|
|||
|
||||
class ArrayColumn : public ColumnBase {
|
||||
public:
|
||||
// memory mode ctor
|
||||
ArrayColumn(size_t num_rows, const FieldMeta& field_meta)
|
||||
: ColumnBase(num_rows, field_meta),
|
||||
// MAP_WITH_ANONYMOUS ctor
|
||||
ArrayColumn(size_t reserve_rows, const FieldMeta& field_meta)
|
||||
: ColumnBase(reserve_rows, field_meta),
|
||||
element_type_(field_meta.get_element_type()) {
|
||||
}
|
||||
|
||||
// mmap mode ctor
|
||||
// MAP_WITH_FILE ctor
|
||||
ArrayColumn(const File& file, size_t size, const FieldMeta& field_meta)
|
||||
: ColumnBase(file, size, field_meta),
|
||||
element_type_(field_meta.get_element_type()) {
|
||||
}
|
||||
|
||||
ArrayColumn(size_t reserve,
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptorPtr descriptor,
|
||||
bool nullable)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor, nullable) {
|
||||
}
|
||||
|
||||
ArrayColumn(ArrayColumn&& column) noexcept
|
||||
: ColumnBase(std::move(column)),
|
||||
indices_(std::move(column.indices_)),
|
||||
views_(std::move(column.views_)),
|
||||
element_type_(column.element_type_) {
|
||||
}
|
||||
|
||||
~ArrayColumn() override = default;
|
||||
|
||||
SpanBase
|
||||
|
@ -905,6 +855,7 @@ class ArrayColumn : public ColumnBase {
|
|||
indices_ = std::move(indices);
|
||||
element_indices_ = std::move(element_indices);
|
||||
}
|
||||
num_rows_ = indices_.size();
|
||||
ConstructViews();
|
||||
}
|
||||
|
||||
|
|
|
@ -41,12 +41,13 @@ namespace milvus {
|
|||
|
||||
/*
|
||||
* If string field's value all empty, need a string padding to avoid
|
||||
* mmap failing because size_ is zero which causing invalid arguement
|
||||
* mmap failing because size_ is zero which causing invalid argument
|
||||
* array has the same problem
|
||||
* TODO: remove it when support NULL value
|
||||
*/
|
||||
constexpr size_t FILE_STRING_PADDING = 1;
|
||||
constexpr size_t FILE_ARRAY_PADDING = 1;
|
||||
constexpr size_t SPARSE_FLOAT_PADDING = 4;
|
||||
|
||||
inline size_t
|
||||
PaddingSize(const DataType& type) {
|
||||
|
@ -60,6 +61,8 @@ PaddingSize(const DataType& type) {
|
|||
break;
|
||||
case DataType::ARRAY:
|
||||
return FILE_ARRAY_PADDING;
|
||||
case DataType::VECTOR_SPARSE_FLOAT:
|
||||
return SPARSE_FLOAT_PADDING;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -359,8 +359,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
|
|||
var_column->Append(std::move(field_data));
|
||||
}
|
||||
var_column->Seal();
|
||||
field_data_size = var_column->ByteSize();
|
||||
stats_.mem_size += var_column->ByteSize();
|
||||
field_data_size = var_column->DataByteSize();
|
||||
stats_.mem_size += var_column->MemoryUsageBytes();
|
||||
LoadStringSkipIndex(field_id, 0, *var_column);
|
||||
column = std::move(var_column);
|
||||
break;
|
||||
|
@ -374,8 +374,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
|
|||
var_column->Append(std::move(field_data));
|
||||
}
|
||||
var_column->Seal();
|
||||
stats_.mem_size += var_column->ByteSize();
|
||||
field_data_size = var_column->ByteSize();
|
||||
stats_.mem_size += var_column->MemoryUsageBytes();
|
||||
field_data_size = var_column->DataByteSize();
|
||||
column = std::move(var_column);
|
||||
break;
|
||||
}
|
||||
|
@ -552,8 +552,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
|
|||
}
|
||||
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
|
||||
auto sparse_column = std::make_shared<SparseFloatColumn>(
|
||||
file, total_written, field_meta);
|
||||
sparse_column->Seal(std::move(indices));
|
||||
file, total_written, field_meta, std::move(indices));
|
||||
column = std::move(sparse_column);
|
||||
break;
|
||||
}
|
||||
|
@ -1173,10 +1172,10 @@ SegmentSealedImpl::get_vector(FieldId field_id,
|
|||
"column not found");
|
||||
const auto& column = path_to_column.at(data_path);
|
||||
AssertInfo(
|
||||
offset_in_binlog * row_bytes < column->ByteSize(),
|
||||
offset_in_binlog < column->NumRows(),
|
||||
"column idx out of range, idx: {}, size: {}, data_path: {}",
|
||||
offset_in_binlog * row_bytes,
|
||||
column->ByteSize(),
|
||||
offset_in_binlog,
|
||||
column->NumRows(),
|
||||
data_path);
|
||||
auto vector = &column->Data()[offset_in_binlog * row_bytes];
|
||||
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);
|
||||
|
|
|
@ -14,9 +14,10 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "ChunkCache.h"
|
||||
#include <future>
|
||||
#include <memory>
|
||||
|
||||
#include "ChunkCache.h"
|
||||
#include "common/Types.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
|
@ -63,12 +64,12 @@ ChunkCache::Read(const std::string& filepath,
|
|||
std::string err_msg = "";
|
||||
try {
|
||||
field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath);
|
||||
column = Mmap(
|
||||
column = ConvertToColumn(
|
||||
field_data->GetFieldData(), descriptor, field_meta, mmap_enabled);
|
||||
if (mmap_enabled && mmap_rss_not_need) {
|
||||
auto ok = madvise(reinterpret_cast<void*>(
|
||||
const_cast<char*>(column->MmappedData())),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
ReadAheadPolicy_Map["dontneed"]);
|
||||
if (ok != 0) {
|
||||
LOG_WARN(
|
||||
|
@ -77,7 +78,7 @@ ChunkCache::Read(const std::string& filepath,
|
|||
"{}",
|
||||
filepath,
|
||||
static_cast<const void*>(column->MmappedData()),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
strerror(errno));
|
||||
}
|
||||
}
|
||||
|
@ -124,53 +125,39 @@ ChunkCache::Prefetch(const std::string& filepath) {
|
|||
auto column = it->second.second.get();
|
||||
auto ok = madvise(
|
||||
reinterpret_cast<void*>(const_cast<char*>(column->MmappedData())),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
read_ahead_policy_);
|
||||
if (ok != 0) {
|
||||
LOG_WARN(
|
||||
"failed to madvise to the data file {}, addr {}, size {}, err: {}",
|
||||
filepath,
|
||||
static_cast<const void*>(column->MmappedData()),
|
||||
column->ByteSize(),
|
||||
column->DataByteSize(),
|
||||
strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<ColumnBase>
|
||||
ChunkCache::Mmap(const FieldDataPtr& field_data,
|
||||
const MmapChunkDescriptorPtr& descriptor,
|
||||
const FieldMeta& field_meta,
|
||||
bool mmap_enabled) {
|
||||
auto dim = field_data->get_dim();
|
||||
ChunkCache::ConvertToColumn(const FieldDataPtr& field_data,
|
||||
const MmapChunkDescriptorPtr& descriptor,
|
||||
const FieldMeta& field_meta,
|
||||
bool mmap_enabled) {
|
||||
auto data_type = field_data->get_data_type();
|
||||
|
||||
auto data_size = field_data->Size();
|
||||
|
||||
std::shared_ptr<ColumnBase> column{};
|
||||
|
||||
if (IsSparseFloatVectorDataType(data_type)) {
|
||||
std::vector<uint64_t> indices{};
|
||||
uint64_t offset = 0;
|
||||
for (auto i = 0; i < field_data->get_num_rows(); ++i) {
|
||||
indices.push_back(offset);
|
||||
offset += field_data->DataSize(i);
|
||||
}
|
||||
std::shared_ptr<SparseFloatColumn> sparse_column;
|
||||
if (mmap_enabled) {
|
||||
sparse_column = std::make_shared<SparseFloatColumn>(
|
||||
data_size, dim, data_type, mcm_, descriptor);
|
||||
column = std::make_shared<SparseFloatColumn>(mcm_, descriptor);
|
||||
} else {
|
||||
sparse_column = std::make_shared<SparseFloatColumn>(field_meta);
|
||||
column = std::make_shared<SparseFloatColumn>(field_meta);
|
||||
}
|
||||
sparse_column->Seal(std::move(indices));
|
||||
column = std::move(sparse_column);
|
||||
} else if (IsVariableDataType(data_type)) {
|
||||
AssertInfo(
|
||||
false, "TODO: unimplemented for variable data type: {}", data_type);
|
||||
} else {
|
||||
if (mmap_enabled) {
|
||||
column = std::make_shared<Column>(data_size,
|
||||
dim,
|
||||
column = std::make_shared<Column>(field_data->Size(),
|
||||
data_type,
|
||||
mcm_,
|
||||
descriptor,
|
||||
|
|
|
@ -59,10 +59,10 @@ class ChunkCache {
|
|||
|
||||
private:
|
||||
std::shared_ptr<ColumnBase>
|
||||
Mmap(const FieldDataPtr& field_data,
|
||||
const MmapChunkDescriptorPtr& descriptor,
|
||||
const FieldMeta& field_meta,
|
||||
bool mmap_enabled);
|
||||
ConvertToColumn(const FieldDataPtr& field_data,
|
||||
const MmapChunkDescriptorPtr& descriptor,
|
||||
const FieldMeta& field_meta,
|
||||
bool mmap_enabled);
|
||||
|
||||
private:
|
||||
using ColumnTable = std::unordered_map<
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
|
||||
#define DEFAULT_READ_AHEAD_POLICY "willneed"
|
||||
class ChunkCacheTest : public testing::Test {
|
||||
class ChunkCacheTest : public testing::TestWithParam</*mmap enabled*/ bool> {
|
||||
public:
|
||||
void
|
||||
SetUp() override {
|
||||
|
@ -38,7 +38,8 @@ class ChunkCacheTest : public testing::Test {
|
|||
TearDown() override {
|
||||
mcm->UnRegister(descriptor);
|
||||
}
|
||||
const char* file_name = "chunk_cache_test/insert_log/2/101/1000000";
|
||||
const char* dense_file_name = "chunk_cache_test/insert_log/2/101/1000000";
|
||||
const char* sparse_file_name = "chunk_cache_test/insert_log/2/102/1000000";
|
||||
milvus::storage::MmapChunkManagerPtr mcm;
|
||||
milvus::segcore::SegcoreConfig config;
|
||||
milvus::storage::MmapChunkDescriptorPtr descriptor =
|
||||
|
@ -47,151 +48,218 @@ class ChunkCacheTest : public testing::Test {
|
|||
{101, SegmentType::Sealed}));
|
||||
};
|
||||
|
||||
TEST_F(ChunkCacheTest, Read) {
|
||||
INSTANTIATE_TEST_SUITE_P(ChunkCacheTestSuite,
|
||||
ChunkCacheTest,
|
||||
testing::Values(true, false));
|
||||
|
||||
TEST_P(ChunkCacheTest, Read) {
|
||||
auto N = 10000;
|
||||
auto dim = 128;
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
auto dense_metric_type = knowhere::metric::L2;
|
||||
auto sparse_metric_type = knowhere::metric::IP;
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto fake_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
auto fake_dense_vec_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type);
|
||||
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
|
||||
auto fake_sparse_vec_id =
|
||||
schema->AddDebugField("fakevec_sparse",
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
auto dataset = milvus::segcore::DataGen(schema, N);
|
||||
|
||||
auto field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_id.get()};
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
|
||||
fake_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
metric_type,
|
||||
false);
|
||||
auto dense_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()};
|
||||
auto sparse_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()};
|
||||
auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
|
||||
fake_dense_vec_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
dense_metric_type,
|
||||
false);
|
||||
auto sparse_field_meta =
|
||||
milvus::FieldMeta(milvus::FieldName("fakevec_sparse"),
|
||||
fake_sparse_vec_id,
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type,
|
||||
false);
|
||||
|
||||
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
|
||||
.GetChunkManager();
|
||||
auto data = dataset.get_col<float>(fake_id);
|
||||
auto data_slices = std::vector<void*>{data.data()};
|
||||
auto dense_data = dataset.get_col<float>(fake_dense_vec_id);
|
||||
auto sparse_data =
|
||||
dataset.get_col<knowhere::sparse::SparseRow<float>>(fake_sparse_vec_id);
|
||||
|
||||
auto data_slices = std::vector<void*>{dense_data.data()};
|
||||
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
auto slice_names = std::vector<std::string>{file_name};
|
||||
auto slice_names = std::vector<std::string>{dense_file_name};
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
field_data_meta,
|
||||
field_meta);
|
||||
dense_field_data_meta,
|
||||
dense_field_meta);
|
||||
|
||||
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
|
||||
const auto& column = cc->Read(file_name, descriptor, field_meta, true);
|
||||
Assert(column->ByteSize() == dim * N * 4);
|
||||
|
||||
auto actual = (float*)column->Data();
|
||||
for (auto i = 0; i < N; i++) {
|
||||
AssertInfo(data[i] == actual[i],
|
||||
fmt::format("expect {}, actual {}", data[i], actual[i]));
|
||||
}
|
||||
|
||||
cc->Remove(file_name);
|
||||
lcm->Remove(file_name);
|
||||
}
|
||||
|
||||
TEST_F(ChunkCacheTest, ReadByMemoryMode) {
|
||||
auto N = 10000;
|
||||
auto dim = 128;
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto fake_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
auto dataset = milvus::segcore::DataGen(schema, N);
|
||||
|
||||
auto field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_id.get()};
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
|
||||
fake_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
metric_type,
|
||||
false);
|
||||
|
||||
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
|
||||
.GetChunkManager();
|
||||
auto data = dataset.get_col<float>(fake_id);
|
||||
auto data_slices = std::vector<void*>{data.data()};
|
||||
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
auto slice_names = std::vector<std::string>{file_name};
|
||||
data_slices = std::vector<void*>{sparse_data.data()};
|
||||
slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
slice_names = std::vector<std::string>{sparse_file_name};
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
field_data_meta,
|
||||
field_meta);
|
||||
sparse_field_data_meta,
|
||||
sparse_field_meta);
|
||||
|
||||
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
|
||||
const auto& column = cc->Read(file_name, descriptor, field_meta, false);
|
||||
Assert(column->ByteSize() == dim * N * 4);
|
||||
|
||||
auto actual = (float*)column->Data();
|
||||
for (auto i = 0; i < N; i++) {
|
||||
AssertInfo(data[i] == actual[i],
|
||||
fmt::format("expect {}, actual {}", data[i], actual[i]));
|
||||
// validate dense data
|
||||
const auto& dense_column =
|
||||
cc->Read(dense_file_name, descriptor, dense_field_meta, GetParam());
|
||||
Assert(dense_column->DataByteSize() == dim * N * 4);
|
||||
auto actual_dense = (const float*)(dense_column->Data());
|
||||
for (auto i = 0; i < N * dim; i++) {
|
||||
AssertInfo(dense_data[i] == actual_dense[i],
|
||||
fmt::format(
|
||||
"expect {}, actual {}", dense_data[i], actual_dense[i]));
|
||||
}
|
||||
|
||||
cc->Remove(file_name);
|
||||
lcm->Remove(file_name);
|
||||
// validate sparse data
|
||||
const auto& sparse_column =
|
||||
cc->Read(sparse_file_name, descriptor, sparse_field_meta, GetParam());
|
||||
auto expected_sparse_size = 0;
|
||||
auto actual_sparse =
|
||||
(const knowhere::sparse::SparseRow<float>*)(sparse_column->Data());
|
||||
for (auto i = 0; i < N; i++) {
|
||||
const auto& actual_sparse_row = actual_sparse[i];
|
||||
const auto& expect_sparse_row = sparse_data[i];
|
||||
AssertInfo(
|
||||
actual_sparse_row.size() == expect_sparse_row.size(),
|
||||
fmt::format("Incorrect size of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.size(),
|
||||
actual_sparse_row.size()));
|
||||
auto bytes = actual_sparse_row.data_byte_size();
|
||||
AssertInfo(
|
||||
memcmp(actual_sparse_row.data(), expect_sparse_row.data(), bytes) ==
|
||||
0,
|
||||
fmt::format("Incorrect data of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.data(),
|
||||
actual_sparse_row.data()));
|
||||
expected_sparse_size += bytes;
|
||||
}
|
||||
|
||||
ASSERT_EQ(sparse_column->DataByteSize(), expected_sparse_size);
|
||||
|
||||
cc->Remove(dense_file_name);
|
||||
cc->Remove(sparse_file_name);
|
||||
lcm->Remove(dense_file_name);
|
||||
lcm->Remove(sparse_file_name);
|
||||
}
|
||||
|
||||
TEST_F(ChunkCacheTest, TestMultithreads) {
|
||||
TEST_P(ChunkCacheTest, TestMultithreads) {
|
||||
auto N = 1000;
|
||||
auto dim = 128;
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
auto dense_metric_type = knowhere::metric::L2;
|
||||
auto sparse_metric_type = knowhere::metric::IP;
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto fake_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
auto fake_dense_vec_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type);
|
||||
auto fake_sparse_vec_id =
|
||||
schema->AddDebugField("fakevec_sparse",
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type);
|
||||
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
auto dataset = milvus::segcore::DataGen(schema, N);
|
||||
|
||||
auto field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_id.get()};
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
|
||||
fake_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
metric_type,
|
||||
false);
|
||||
auto dense_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()};
|
||||
auto sparse_field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()};
|
||||
auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
|
||||
fake_dense_vec_id,
|
||||
milvus::DataType::VECTOR_FLOAT,
|
||||
dim,
|
||||
dense_metric_type,
|
||||
false);
|
||||
auto sparse_field_meta =
|
||||
milvus::FieldMeta(milvus::FieldName("fakevec_sparse"),
|
||||
fake_sparse_vec_id,
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
sparse_metric_type,
|
||||
false);
|
||||
|
||||
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
|
||||
.GetChunkManager();
|
||||
auto data = dataset.get_col<float>(fake_id);
|
||||
auto data_slices = std::vector<void*>{data.data()};
|
||||
auto dense_data = dataset.get_col<float>(fake_dense_vec_id);
|
||||
auto sparse_data =
|
||||
dataset.get_col<knowhere::sparse::SparseRow<float>>(fake_sparse_vec_id);
|
||||
|
||||
auto dense_data_slices = std::vector<void*>{dense_data.data()};
|
||||
auto sparse_data_slices = std::vector<void*>{sparse_data.data()};
|
||||
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
auto slice_names = std::vector<std::string>{file_name};
|
||||
auto dense_slice_names = std::vector<std::string>{dense_file_name};
|
||||
auto sparse_slice_names = std::vector<std::string>{sparse_file_name};
|
||||
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
dense_data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
field_data_meta,
|
||||
field_meta);
|
||||
dense_slice_names,
|
||||
dense_field_data_meta,
|
||||
dense_field_meta);
|
||||
|
||||
PutFieldData(lcm.get(),
|
||||
sparse_data_slices,
|
||||
slice_sizes,
|
||||
sparse_slice_names,
|
||||
sparse_field_data_meta,
|
||||
sparse_field_meta);
|
||||
|
||||
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
|
||||
|
||||
constexpr int threads = 16;
|
||||
std::vector<int64_t> total_counts(threads);
|
||||
auto executor = [&](int thread_id) {
|
||||
const auto& column = cc->Read(file_name, descriptor, field_meta, true);
|
||||
Assert(column->ByteSize() == dim * N * 4);
|
||||
const auto& dense_column =
|
||||
cc->Read(dense_file_name, descriptor, dense_field_meta, GetParam());
|
||||
Assert(dense_column->DataByteSize() == dim * N * 4);
|
||||
|
||||
auto actual = (float*)column->Data();
|
||||
auto actual_dense = (const float*)dense_column->Data();
|
||||
for (auto i = 0; i < N * dim; i++) {
|
||||
AssertInfo(
|
||||
dense_data[i] == actual_dense[i],
|
||||
fmt::format(
|
||||
"expect {}, actual {}", dense_data[i], actual_dense[i]));
|
||||
}
|
||||
|
||||
const auto& sparse_column = cc->Read(
|
||||
sparse_file_name, descriptor, sparse_field_meta, GetParam());
|
||||
auto actual_sparse =
|
||||
(const knowhere::sparse::SparseRow<float>*)sparse_column->Data();
|
||||
for (auto i = 0; i < N; i++) {
|
||||
AssertInfo(data[i] == actual[i],
|
||||
fmt::format("expect {}, actual {}", data[i], actual[i]));
|
||||
const auto& actual_sparse_row = actual_sparse[i];
|
||||
const auto& expect_sparse_row = sparse_data[i];
|
||||
AssertInfo(actual_sparse_row.size() == expect_sparse_row.size(),
|
||||
fmt::format(
|
||||
"Incorrect size of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.size(),
|
||||
actual_sparse_row.size()));
|
||||
auto bytes = actual_sparse_row.data_byte_size();
|
||||
AssertInfo(memcmp(actual_sparse_row.data(),
|
||||
expect_sparse_row.data(),
|
||||
bytes) == 0,
|
||||
fmt::format(
|
||||
"Incorrect data of sparse row: expect {}, actual {}",
|
||||
expect_sparse_row.data(),
|
||||
actual_sparse_row.data()));
|
||||
}
|
||||
};
|
||||
std::vector<std::thread> pool;
|
||||
|
@ -202,6 +270,8 @@ TEST_F(ChunkCacheTest, TestMultithreads) {
|
|||
thread.join();
|
||||
}
|
||||
|
||||
cc->Remove(file_name);
|
||||
lcm->Remove(file_name);
|
||||
cc->Remove(dense_file_name);
|
||||
cc->Remove(sparse_file_name);
|
||||
lcm->Remove(dense_file_name);
|
||||
lcm->Remove(sparse_file_name);
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "storage/MmapManager.h"
|
||||
#include "storage/MinioChunkManager.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/Util.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
|
@ -1530,9 +1531,6 @@ TEST(Sealed, GetVectorFromChunkCache) {
|
|||
}
|
||||
|
||||
TEST(Sealed, GetSparseVectorFromChunkCache) {
|
||||
// skip test due to mem leak from AWS::InitSDK
|
||||
return;
|
||||
|
||||
auto dim = 16;
|
||||
auto topK = 5;
|
||||
auto N = ROW_COUNT;
|
||||
|
@ -1544,9 +1542,8 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
|||
auto file_name = std::string(
|
||||
"sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000");
|
||||
|
||||
auto sc = milvus::storage::StorageConfig{};
|
||||
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc);
|
||||
auto mcm = std::make_unique<milvus::storage::MinioChunkManager>(sc);
|
||||
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
|
||||
.GetChunkManager();
|
||||
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto fakevec_id = schema->AddDebugField(
|
||||
|
@ -1563,19 +1560,28 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
|||
auto dataset = DataGen(schema, N);
|
||||
auto field_data_meta =
|
||||
milvus::storage::FieldDataMeta{1, 2, 3, fakevec_id.get()};
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
|
||||
auto field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
|
||||
fakevec_id,
|
||||
milvus::DataType::VECTOR_SPARSE_FLOAT,
|
||||
dim,
|
||||
metric_type);
|
||||
metric_type,
|
||||
false);
|
||||
|
||||
auto rcm = milvus::storage::RemoteChunkManagerSingleton::GetInstance()
|
||||
.GetRemoteChunkManager();
|
||||
auto data = dataset.get_col<knowhere::sparse::SparseRow<float>>(fakevec_id);
|
||||
auto data_slices = std::vector<void*>{data.data()};
|
||||
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
|
||||
auto slice_names = std::vector<std::string>{file_name};
|
||||
PutFieldData(rcm.get(),
|
||||
|
||||
// write to multiple files for better coverage
|
||||
auto data_slices = std::vector<void*>();
|
||||
auto slice_sizes = std::vector<int64_t>();
|
||||
auto slice_names = std::vector<std::string>();
|
||||
|
||||
const int64_t slice_size = (N + 9) / 10;
|
||||
for (int64_t i = 0; i < N; i += slice_size) {
|
||||
int64_t current_slice_size = std::min(slice_size, N - i);
|
||||
data_slices.push_back(data.data() + i);
|
||||
slice_sizes.push_back(current_slice_size);
|
||||
slice_names.push_back(file_name + "_" + std::to_string(i / slice_size));
|
||||
}
|
||||
PutFieldData(lcm.get(),
|
||||
data_slices,
|
||||
slice_sizes,
|
||||
slice_names,
|
||||
|
@ -1599,11 +1605,7 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
|||
segment_sealed->LoadIndex(vec_info);
|
||||
|
||||
auto field_binlog_info =
|
||||
FieldBinlogInfo{fakevec_id.get(),
|
||||
N,
|
||||
std::vector<int64_t>{N},
|
||||
false,
|
||||
std::vector<std::string>{file_name}};
|
||||
FieldBinlogInfo{fakevec_id.get(), N, slice_sizes, false, slice_names};
|
||||
segment_sealed->AddFieldDataInfoForSealed(
|
||||
LoadFieldDataInfo{std::map<int64_t, FieldBinlogInfo>{
|
||||
{fakevec_id.get(), field_binlog_info}}});
|
||||
|
@ -1630,9 +1632,11 @@ TEST(Sealed, GetSparseVectorFromChunkCache) {
|
|||
"sparse float vector doesn't match");
|
||||
}
|
||||
|
||||
rcm->Remove(file_name);
|
||||
auto exist = rcm->Exist(file_name);
|
||||
Assert(!exist);
|
||||
for (const auto& name : slice_names) {
|
||||
lcm->Remove(name);
|
||||
auto exist = lcm->Exist(name);
|
||||
Assert(!exist);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(Sealed, WarmupChunkCache) {
|
||||
|
|
Loading…
Reference in New Issue