fix: Fix chunked segment can not warmup using mmap (#38492)

issue: #38410

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/38514/head
Bingyi Sun 2024-12-17 13:42:45 +08:00 committed by GitHub
parent 33aecb0655
commit dd4f33ae19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 206 additions and 151 deletions

View File

@ -157,7 +157,7 @@ ChunkedSegmentSealedImpl::WarmupChunkCache(const FieldId field_id,
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
for (const auto& data_path : field_info.insert_files) {
auto column = cc->Read(data_path, mmap_descriptor_, field_meta);
auto column = cc->Read(data_path, field_meta, mmap_enabled, true);
}
}
@ -948,7 +948,7 @@ std::tuple<
descriptor,
const FieldMeta&
field_meta) {
auto column = cc->Read(data_path, descriptor, field_meta);
auto column = cc->Read(data_path, field_meta, true);
cc->Prefetch(data_path);
return {data_path, std::dynamic_pointer_cast<ChunkedColumnBase>(column)};
}

View File

@ -14,10 +14,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <filesystem>
#include <future>
#include <memory>
#include "ChunkCache.h"
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
#include "common/Chunk.h"
#include "common/ChunkWriter.h"
#include "common/FieldMeta.h"
#include "common/Types.h"
@ -26,8 +30,9 @@
namespace milvus::storage {
std::shared_ptr<ColumnBase>
ChunkCache::Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta) {
const FieldMeta& field_meta,
bool mmap_enabled,
bool mmap_rss_not_need) {
// use rlock to get future
{
std::shared_lock lck(mutex_);
@ -66,8 +71,28 @@ ChunkCache::Read(const std::string& filepath,
auto field_data =
DownloadAndDecodeRemoteFile(cm_.get(), filepath, false);
auto chunk = create_chunk(
field_meta, field_meta.get_dim(), field_data->GetReader()->reader);
std::shared_ptr<Chunk> chunk;
auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type())
? 1
: field_meta.get_dim();
if (mmap_enabled) {
auto path = std::filesystem::path(CachePath(filepath));
auto dir = path.parent_path();
std::filesystem::create_directories(dir);
auto file = File::Open(path.string(), O_CREAT | O_TRUNC | O_RDWR);
chunk = create_chunk(
field_meta, dim, file, 0, field_data->GetReader()->reader);
// unlink
auto ok = unlink(path.c_str());
AssertInfo(ok == 0,
"failed to unlink mmap data file {}, err: {}",
path.c_str(),
strerror(errno));
} else {
chunk =
create_chunk(field_meta, dim, field_data->GetReader()->reader);
}
auto data_type = field_meta.get_data_type();
if (IsSparseFloatVectorDataType(data_type)) {
@ -83,6 +108,22 @@ ChunkCache::Read(const std::string& filepath,
std::vector<std::shared_ptr<Chunk>> chunks{chunk};
column = std::make_shared<ChunkedColumn>(chunks);
}
if (mmap_enabled && mmap_rss_not_need) {
auto ok = madvise(reinterpret_cast<void*>(
const_cast<char*>(column->MmappedData())),
column->DataByteSize(),
ReadAheadPolicy_Map["dontneed"]);
if (ok != 0) {
LOG_WARN(
"failed to madvise to the data file {}, addr {}, size {}, "
"err: "
"{}",
filepath,
static_cast<const void*>(column->MmappedData()),
column->DataByteSize(),
strerror(errno));
}
}
} catch (const SegcoreError& e) {
err_code = e.get_error_code();
err_msg = fmt::format("failed to read for chunkCache, seg_core_err:{}",
@ -261,4 +302,22 @@ ChunkCache::ConvertToColumn(const FieldDataPtr& field_data,
column->AppendBatch(field_data);
return column;
}
// TODO(sunby): use mmap chunk manager to create chunk
std::string
ChunkCache::CachePath(const std::string& filepath) {
auto path = std::filesystem::path(filepath);
auto prefix = std::filesystem::path(path_prefix_);
// Cache path shall not use absolute filepath direct, it shall always under path_prefix_
if (path.is_absolute()) {
return (prefix /
filepath.substr(path.root_directory().string().length(),
filepath.length()))
.string();
}
return (prefix / filepath).string();
}
} // namespace milvus::storage

View File

@ -27,10 +27,11 @@ extern std::map<std::string, int> ReadAheadPolicy_Map;
class ChunkCache {
public:
explicit ChunkCache(const std::string& read_ahead_policy,
explicit ChunkCache(std::string& path_prefix,
const std::string& read_ahead_policy,
ChunkManagerPtr cm,
MmapChunkManagerPtr mcm)
: cm_(cm), mcm_(mcm) {
: path_prefix_(path_prefix), cm_(cm), mcm_(mcm) {
auto iter = ReadAheadPolicy_Map.find(read_ahead_policy);
AssertInfo(iter != ReadAheadPolicy_Map.end(),
"unrecognized read ahead policy: {}, "
@ -47,8 +48,9 @@ class ChunkCache {
public:
std::shared_ptr<ColumnBase>
Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta);
const FieldMeta& field_meta,
bool mmap_enabled,
bool mmap_rss_not_need = false);
std::shared_ptr<ColumnBase>
Read(const std::string& filepath,
@ -85,6 +87,8 @@ class ChunkCache {
ChunkManagerPtr cm_;
MmapChunkManagerPtr mcm_;
ColumnTable columns_;
std::string path_prefix_;
};
using ChunkCachePtr = std::shared_ptr<milvus::storage::ChunkCache>;

View File

@ -66,7 +66,10 @@ class MmapManager {
auto rcm = RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
cc_ = std::make_shared<ChunkCache>(
std::move(mmap_config_.cache_read_ahead_policy), rcm, mcm_);
mmap_config_.mmap_path,
std::move(mmap_config_.cache_read_ahead_policy),
rcm,
mcm_);
}
LOG_INFO("Init MmapConfig with MmapConfig: {}",
mmap_config_.ToString());

View File

@ -16,23 +16,96 @@
#include <gtest/gtest.h>
#include <iostream>
#include <string>
#include <utility>
#include <vector>
#include "common/Consts.h"
#include "common/FieldMeta.h"
#include "common/Types.h"
#include "fmt/format.h"
#include "common/Schema.h"
#include "gtest/gtest.h"
#include "knowhere/sparse_utils.h"
#include "mmap/Column.h"
#include "test_utils/DataGen.h"
#include "test_utils/storage_test_utils.h"
#include "storage/ChunkCache.h"
#include "storage/LocalChunkManagerSingleton.h"
#define DEFAULT_READ_AHEAD_POLICY "willneed"
class ChunkCacheTest : public testing::TestWithParam</*mmap enabled*/ bool> {
public:
class ChunkCacheTest
: public testing::TestWithParam<
/*mmap enabled, is chunked*/ std::tuple<bool, bool>> {
protected:
void
SetUp() override {
mcm = milvus::storage::MmapManager::GetInstance().GetMmapChunkManager();
mcm->Register(descriptor);
N = 10000;
dim = 128;
auto dense_metric_type = knowhere::metric::L2;
auto sparse_metric_type = knowhere::metric::IP;
auto schema = std::make_shared<milvus::Schema>();
auto fake_dense_vec_id = schema->AddDebugField(
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type);
auto i64_fid =
schema->AddDebugField("counter", milvus::DataType::INT64);
auto fake_sparse_vec_id =
schema->AddDebugField("fakevec_sparse",
milvus::DataType::VECTOR_SPARSE_FLOAT,
dim,
sparse_metric_type);
schema->set_primary_field_id(i64_fid);
auto dataset = milvus::segcore::DataGen(schema, N);
auto dense_field_data_meta =
milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()};
auto sparse_field_data_meta =
milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()};
dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
fake_dense_vec_id,
milvus::DataType::VECTOR_FLOAT,
dim,
dense_metric_type,
false);
sparse_field_meta =
milvus::FieldMeta(milvus::FieldName("fakevec_sparse"),
fake_sparse_vec_id,
milvus::DataType::VECTOR_SPARSE_FLOAT,
dim,
sparse_metric_type,
false);
lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
dense_data = dataset.get_col<float>(fake_dense_vec_id);
sparse_data = dataset.get_col<knowhere::sparse::SparseRow<float>>(
fake_sparse_vec_id);
auto data_slices = std::vector<void*>{dense_data.data()};
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
auto slice_names = std::vector<std::string>{dense_file_name};
PutFieldData(lcm.get(),
data_slices,
slice_sizes,
slice_names,
dense_field_data_meta,
dense_field_meta);
data_slices = std::vector<void*>{sparse_data.data()};
slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
slice_names = std::vector<std::string>{sparse_file_name};
PutFieldData(lcm.get(),
data_slices,
slice_sizes,
slice_names,
sparse_field_data_meta,
sparse_field_meta);
}
void
TearDown() override {
@ -46,81 +119,37 @@ class ChunkCacheTest : public testing::TestWithParam</*mmap enabled*/ bool> {
std::shared_ptr<milvus::storage::MmapChunkDescriptor>(
new milvus::storage::MmapChunkDescriptor(
{101, SegmentType::Sealed}));
};
int N;
int dim;
milvus::FieldMeta dense_field_meta = milvus::FieldMeta::RowIdMeta;
milvus::FixedVector<float> dense_data;
milvus::FieldMeta sparse_field_meta = milvus::FieldMeta::RowIdMeta;
milvus::FixedVector<knowhere::sparse::SparseRow<float>> sparse_data;
std::shared_ptr<milvus::storage::LocalChunkManager> lcm;
};
INSTANTIATE_TEST_SUITE_P(ChunkCacheTestSuite,
ChunkCacheTest,
testing::Values(true, false));
testing::Combine(testing::Bool(), testing::Bool()));
TEST_P(ChunkCacheTest, Read) {
auto N = 10000;
auto dim = 128;
auto dense_metric_type = knowhere::metric::L2;
auto sparse_metric_type = knowhere::metric::IP;
auto schema = std::make_shared<milvus::Schema>();
auto fake_dense_vec_id = schema->AddDebugField(
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type);
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
auto fake_sparse_vec_id =
schema->AddDebugField("fakevec_sparse",
milvus::DataType::VECTOR_SPARSE_FLOAT,
dim,
sparse_metric_type);
schema->set_primary_field_id(i64_fid);
auto dataset = milvus::segcore::DataGen(schema, N);
auto dense_field_data_meta =
milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()};
auto sparse_field_data_meta =
milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()};
auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
fake_dense_vec_id,
milvus::DataType::VECTOR_FLOAT,
dim,
dense_metric_type,
false);
auto sparse_field_meta =
milvus::FieldMeta(milvus::FieldName("fakevec_sparse"),
fake_sparse_vec_id,
milvus::DataType::VECTOR_SPARSE_FLOAT,
dim,
sparse_metric_type,
false);
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
auto dense_data = dataset.get_col<float>(fake_dense_vec_id);
auto sparse_data =
dataset.get_col<knowhere::sparse::SparseRow<float>>(fake_sparse_vec_id);
auto data_slices = std::vector<void*>{dense_data.data()};
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
auto slice_names = std::vector<std::string>{dense_file_name};
PutFieldData(lcm.get(),
data_slices,
slice_sizes,
slice_names,
dense_field_data_meta,
dense_field_meta);
data_slices = std::vector<void*>{sparse_data.data()};
slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
slice_names = std::vector<std::string>{sparse_file_name};
PutFieldData(lcm.get(),
data_slices,
slice_sizes,
slice_names,
sparse_field_data_meta,
sparse_field_meta);
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
// validate dense data
const auto& dense_column =
cc->Read(dense_file_name, descriptor, dense_field_meta, GetParam());
Assert(dense_column->DataByteSize() == dim * N * 4);
std::shared_ptr<milvus::ColumnBase> dense_column;
auto p = GetParam();
auto mmap_enabled = std::get<0>(p);
auto is_test_chunked = std::get<1>(p);
if (is_test_chunked) {
dense_column =
cc->Read(dense_file_name, dense_field_meta, mmap_enabled);
} else {
dense_column = cc->Read(
dense_file_name, descriptor, dense_field_meta, mmap_enabled);
Assert(dense_column->DataByteSize() == dim * N * 4);
}
auto actual_dense = (const float*)(dense_column->Data());
for (auto i = 0; i < N * dim; i++) {
AssertInfo(dense_data[i] == actual_dense[i],
@ -129,8 +158,14 @@ TEST_P(ChunkCacheTest, Read) {
}
// validate sparse data
const auto& sparse_column =
cc->Read(sparse_file_name, descriptor, sparse_field_meta, GetParam());
std::shared_ptr<milvus::ColumnBase> sparse_column;
if (is_test_chunked) {
sparse_column =
cc->Read(sparse_file_name, sparse_field_meta, mmap_enabled);
} else {
sparse_column = cc->Read(
sparse_file_name, descriptor, sparse_field_meta, mmap_enabled);
}
auto expected_sparse_size = 0;
auto actual_sparse =
(const knowhere::sparse::SparseRow<float>*)(sparse_column->Data());
@ -151,8 +186,9 @@ TEST_P(ChunkCacheTest, Read) {
actual_sparse_row.data()));
expected_sparse_size += bytes;
}
ASSERT_EQ(sparse_column->DataByteSize(), expected_sparse_size);
if (!is_test_chunked) {
Assert(sparse_column->DataByteSize() == expected_sparse_size);
}
cc->Remove(dense_file_name);
cc->Remove(sparse_file_name);
@ -161,76 +197,23 @@ TEST_P(ChunkCacheTest, Read) {
}
TEST_P(ChunkCacheTest, TestMultithreads) {
auto N = 1000;
auto dim = 128;
auto dense_metric_type = knowhere::metric::L2;
auto sparse_metric_type = knowhere::metric::IP;
auto schema = std::make_shared<milvus::Schema>();
auto fake_dense_vec_id = schema->AddDebugField(
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type);
auto fake_sparse_vec_id =
schema->AddDebugField("fakevec_sparse",
milvus::DataType::VECTOR_SPARSE_FLOAT,
dim,
sparse_metric_type);
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto dataset = milvus::segcore::DataGen(schema, N);
auto dense_field_data_meta =
milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()};
auto sparse_field_data_meta =
milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()};
auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"),
fake_dense_vec_id,
milvus::DataType::VECTOR_FLOAT,
dim,
dense_metric_type,
false);
auto sparse_field_meta =
milvus::FieldMeta(milvus::FieldName("fakevec_sparse"),
fake_sparse_vec_id,
milvus::DataType::VECTOR_SPARSE_FLOAT,
dim,
sparse_metric_type,
false);
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
auto dense_data = dataset.get_col<float>(fake_dense_vec_id);
auto sparse_data =
dataset.get_col<knowhere::sparse::SparseRow<float>>(fake_sparse_vec_id);
auto dense_data_slices = std::vector<void*>{dense_data.data()};
auto sparse_data_slices = std::vector<void*>{sparse_data.data()};
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
auto dense_slice_names = std::vector<std::string>{dense_file_name};
auto sparse_slice_names = std::vector<std::string>{sparse_file_name};
PutFieldData(lcm.get(),
dense_data_slices,
slice_sizes,
dense_slice_names,
dense_field_data_meta,
dense_field_meta);
PutFieldData(lcm.get(),
sparse_data_slices,
slice_sizes,
sparse_slice_names,
sparse_field_data_meta,
sparse_field_meta);
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
constexpr int threads = 16;
std::vector<int64_t> total_counts(threads);
auto p = GetParam();
auto mmap_enabled = std::get<0>(p);
auto is_test_chunked = std::get<1>(p);
auto executor = [&](int thread_id) {
const auto& dense_column =
cc->Read(dense_file_name, descriptor, dense_field_meta, GetParam());
Assert(dense_column->DataByteSize() == dim * N * 4);
std::shared_ptr<milvus::ColumnBase> dense_column;
if (is_test_chunked) {
dense_column =
cc->Read(dense_file_name, dense_field_meta, mmap_enabled);
} else {
dense_column = cc->Read(
dense_file_name, descriptor, dense_field_meta, mmap_enabled);
Assert(dense_column->DataByteSize() == dim * N * 4);
}
auto actual_dense = (const float*)dense_column->Data();
for (auto i = 0; i < N * dim; i++) {
@ -240,8 +223,14 @@ TEST_P(ChunkCacheTest, TestMultithreads) {
"expect {}, actual {}", dense_data[i], actual_dense[i]));
}
const auto& sparse_column = cc->Read(
sparse_file_name, descriptor, sparse_field_meta, GetParam());
std::shared_ptr<milvus::ColumnBase> sparse_column;
if (is_test_chunked) {
sparse_column =
cc->Read(sparse_file_name, sparse_field_meta, mmap_enabled);
} else {
sparse_column = cc->Read(
sparse_file_name, descriptor, sparse_field_meta, mmap_enabled);
}
auto actual_sparse =
(const knowhere::sparse::SparseRow<float>*)sparse_column->Data();
for (auto i = 0; i < N; i++) {