enhance: Support memory mode chunk cache (#35347)

Chunk cache supports loading raw vectors into memory.

issue: https://github.com/milvus-io/milvus/issues/35273

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/35688/head
yihao.dai 2024-08-25 15:42:58 +08:00 committed by GitHub
parent 50fcfe8ef1
commit f2b83d316b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 109 additions and 31 deletions

View File

@ -41,7 +41,7 @@ class SegmentSealed : public SegmentInternalInterface {
virtual void
AddFieldDataInfoForSealed(const LoadFieldDataInfo& field_data_info) = 0;
virtual void
WarmupChunkCache(const FieldId field_id) = 0;
WarmupChunkCache(const FieldId field_id, bool mmap_enabled) = 0;
SegmentType
type() const override {

View File

@ -128,7 +128,7 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
}
void
SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) {
SegmentSealedImpl::WarmupChunkCache(const FieldId field_id, bool mmap_enabled) {
auto& field_meta = schema_->operator[](field_id);
AssertInfo(field_meta.is_vector(), "vector field is not vector type");
@ -153,7 +153,8 @@ SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) {
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
for (const auto& data_path : field_info.insert_files) {
auto column = cc->Read(data_path, mmap_descriptor_);
auto column =
cc->Read(data_path, mmap_descriptor_, field_meta, mmap_enabled);
}
}
@ -833,7 +834,11 @@ std::tuple<std::string, std::shared_ptr<ColumnBase>> static ReadFromChunkCache(
const storage::ChunkCachePtr& cc,
const std::string& data_path,
const storage::MmapChunkDescriptorPtr& descriptor) {
auto column = cc->Read(data_path, descriptor);
// For mmap mode, field_meta is unused, so just construct a fake field meta.
auto fm =
FieldMeta(FieldName(""), FieldId(0), milvus::DataType::NONE, false);
// TODO: add Load() interface for chunk cache when support retrieve_enable, make Read() raise error if cache miss
auto column = cc->Read(data_path, descriptor, fm, true);
cc->Prefetch(data_path);
return {data_path, column};
}

View File

@ -296,7 +296,7 @@ class SegmentSealedImpl : public SegmentSealed {
LoadScalarIndex(const LoadIndexInfo& info);
void
WarmupChunkCache(const FieldId field_id) override;
WarmupChunkCache(const FieldId field_id, bool mmap_enabled) override;
bool
generate_interim_index(const FieldId field_id);

View File

@ -472,14 +472,16 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment,
}
CStatus
WarmupChunkCache(CSegmentInterface c_segment, int64_t field_id) {
WarmupChunkCache(CSegmentInterface c_segment,
int64_t field_id,
bool mmap_enabled) {
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto segment =
dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
segment->WarmupChunkCache(milvus::FieldId(field_id));
segment->WarmupChunkCache(milvus::FieldId(field_id), mmap_enabled);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(milvus::UnexpectedError, e.what());

View File

@ -133,7 +133,9 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment,
CLoadFieldDataInfo c_load_field_data_info);
CStatus
WarmupChunkCache(CSegmentInterface c_segment, int64_t field_id);
WarmupChunkCache(CSegmentInterface c_segment,
int64_t field_id,
bool mmap_enabled);
////////////////////////////// interfaces for SegmentInterface //////////////////////////////
CStatus

View File

@ -22,7 +22,9 @@
namespace milvus::storage {
std::shared_ptr<ColumnBase>
ChunkCache::Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor) {
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta,
bool mmap_enabled) {
// use rlock to get future
{
std::shared_lock lck(mutex_);
@ -60,7 +62,8 @@ ChunkCache::Read(const std::string& filepath,
std::string err_msg = "";
try {
field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath);
column = Mmap(field_data->GetFieldData(), descriptor);
column = Mmap(
field_data->GetFieldData(), descriptor, field_meta, mmap_enabled);
allocate_success = true;
} catch (const SegcoreError& e) {
err_code = e.get_error_code();
@ -118,7 +121,9 @@ ChunkCache::Prefetch(const std::string& filepath) {
std::shared_ptr<ColumnBase>
ChunkCache::Mmap(const FieldDataPtr& field_data,
const MmapChunkDescriptorPtr& descriptor) {
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta,
bool mmap_enabled) {
auto dim = field_data->get_dim();
auto data_type = field_data->get_data_type();
@ -133,20 +138,30 @@ ChunkCache::Mmap(const FieldDataPtr& field_data,
indices.push_back(offset);
offset += field_data->DataSize(i);
}
auto sparse_column = std::make_shared<SparseFloatColumn>(
data_size, dim, data_type, mcm_, descriptor);
std::shared_ptr<SparseFloatColumn> sparse_column;
if (mmap_enabled) {
sparse_column = std::make_shared<SparseFloatColumn>(
data_size, dim, data_type, mcm_, descriptor);
} else {
sparse_column = std::make_shared<SparseFloatColumn>(field_meta);
}
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);
} else if (IsVariableDataType(data_type)) {
AssertInfo(
false, "TODO: unimplemented for variable data type: {}", data_type);
} else {
column = std::make_shared<Column>(data_size,
dim,
data_type,
mcm_,
descriptor,
field_data->IsNullable());
if (mmap_enabled) {
column = std::make_shared<Column>(data_size,
dim,
data_type,
mcm_,
descriptor,
field_data->IsNullable());
} else {
column = std::make_shared<Column>(field_data->get_num_rows(),
field_meta);
}
}
column->AppendBatch(field_data);
return column;

View File

@ -45,7 +45,10 @@ class ChunkCache {
public:
std::shared_ptr<ColumnBase>
Read(const std::string& filepath, const MmapChunkDescriptorPtr& descriptor);
Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta,
bool mmap_enabled);
void
Remove(const std::string& filepath);
@ -56,10 +59,9 @@ class ChunkCache {
private:
std::shared_ptr<ColumnBase>
Mmap(const FieldDataPtr& field_data,
const MmapChunkDescriptorPtr& descriptor);
std::string
CachePath(const std::string& filepath);
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta,
bool mmap_enabled);
private:
using ColumnTable = std::unordered_map<

View File

@ -83,7 +83,56 @@ TEST_F(ChunkCacheTest, Read) {
field_meta);
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
const auto& column = cc->Read(file_name, descriptor);
const auto& column = cc->Read(file_name, descriptor, field_meta, true);
Assert(column->ByteSize() == dim * N * 4);
auto actual = (float*)column->Data();
for (auto i = 0; i < N; i++) {
AssertInfo(data[i] == actual[i],
fmt::format("expect {}, actual {}", data[i], actual[i]));
}
cc->Remove(file_name);
lcm->Remove(file_name);
}
TEST_F(ChunkCacheTest, ReadByMemoryMode) {
auto N = 10000;
auto dim = 128;
auto metric_type = knowhere::metric::L2;
auto schema = std::make_shared<milvus::Schema>();
auto fake_id = schema->AddDebugField(
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto dataset = milvus::segcore::DataGen(schema, N);
auto field_data_meta =
milvus::storage::FieldDataMeta{1, 2, 3, fake_id.get()};
auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"),
fake_id,
milvus::DataType::VECTOR_FLOAT,
dim,
metric_type,
false);
auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
auto data = dataset.get_col<float>(fake_id);
auto data_slices = std::vector<void*>{data.data()};
auto slice_sizes = std::vector<int64_t>{static_cast<int64_t>(N)};
auto slice_names = std::vector<std::string>{file_name};
PutFieldData(lcm.get(),
data_slices,
slice_sizes,
slice_names,
field_data_meta,
field_meta);
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
const auto& column = cc->Read(file_name, descriptor, field_meta, false);
Assert(column->ByteSize() == dim * N * 4);
auto actual = (float*)column->Data();
@ -136,7 +185,7 @@ TEST_F(ChunkCacheTest, TestMultithreads) {
constexpr int threads = 16;
std::vector<int64_t> total_counts(threads);
auto executor = [&](int thread_id) {
const auto& column = cc->Read(file_name, descriptor);
const auto& column = cc->Read(file_name, descriptor, field_meta, true);
Assert(column->ByteSize() == dim * N * 4);
auto actual = (float*)column->Data();

View File

@ -1715,7 +1715,7 @@ TEST(Sealed, WarmupChunkCache) {
auto has = segment->HasRawData(vec_info.field_id);
EXPECT_FALSE(has);
segment_sealed->WarmupChunkCache(FieldId(vec_info.field_id));
segment_sealed->WarmupChunkCache(FieldId(vec_info.field_id), true);
auto ids_ds = GenRandomIds(N);
auto result =

View File

@ -1166,7 +1166,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
}
// 4.
s.WarmupChunkCache(ctx, indexInfo.GetFieldID())
s.WarmupChunkCache(ctx, indexInfo.GetFieldID(), isDataMmapEnable(fieldSchema))
warmupChunkCacheSpan := tr.RecordSpan()
log.Info("Finish loading index",
zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan),
@ -1214,12 +1214,13 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
return nil
}
func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmapEnabled bool) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", fieldID),
zap.Bool("mmapEnabled", mmapEnabled),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
return
@ -1233,7 +1234,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
case "sync":
GetWarmupPool().Submit(func() (any, error) {
cFieldID := C.int64_t(fieldID)
status = C.WarmupChunkCache(s.ptr, cFieldID)
cMmapEnabled := C.bool(mmapEnabled)
status = C.WarmupChunkCache(s.ptr, cFieldID, cMmapEnabled)
if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil {
log.Warn("warming up chunk cache synchronously failed", zap.Error(err))
return nil, err
@ -1253,7 +1255,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
defer s.ptrLock.RUnlock()
cFieldID := C.int64_t(fieldID)
status = C.WarmupChunkCache(s.ptr, cFieldID)
cMmapEnabled := C.bool(mmapEnabled)
status = C.WarmupChunkCache(s.ptr, cFieldID, cMmapEnabled)
if err := HandleCStatus(ctx, &status, ""); err != nil {
log.Warn("warming up chunk cache asynchronously failed", zap.Error(err))
return nil, err