From f2b83d316bef642f95bd22ec3930ffc573d1952f Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sun, 25 Aug 2024 15:42:58 +0800 Subject: [PATCH] enhance: Support memory mode chunk cache (#35347) Chunk cache supports loading raw vectors into memory. issue: https://github.com/milvus-io/milvus/issues/35273 --------- Signed-off-by: bigsheeper --- internal/core/src/segcore/SegmentSealed.h | 2 +- .../core/src/segcore/SegmentSealedImpl.cpp | 11 ++-- internal/core/src/segcore/SegmentSealedImpl.h | 2 +- internal/core/src/segcore/segment_c.cpp | 6 ++- internal/core/src/segcore/segment_c.h | 4 +- internal/core/src/storage/ChunkCache.cpp | 37 +++++++++---- internal/core/src/storage/ChunkCache.h | 12 +++-- internal/core/unittest/test_chunk_cache.cpp | 53 ++++++++++++++++++- internal/core/unittest/test_sealed.cpp | 2 +- internal/querynodev2/segments/segment.go | 11 ++-- 10 files changed, 109 insertions(+), 31 deletions(-) diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index ad73665711..2df9833680 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -41,7 +41,7 @@ class SegmentSealed : public SegmentInternalInterface { virtual void AddFieldDataInfoForSealed(const LoadFieldDataInfo& field_data_info) = 0; virtual void - WarmupChunkCache(const FieldId field_id) = 0; + WarmupChunkCache(const FieldId field_id, bool mmap_enabled) = 0; SegmentType type() const override { diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index c1f83881c6..15791acd49 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -128,7 +128,7 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) { } void -SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) { +SegmentSealedImpl::WarmupChunkCache(const FieldId field_id, bool mmap_enabled) { auto& field_meta = schema_->operator[](field_id); AssertInfo(field_meta.is_vector(), "vector field is not vector type"); @@ -153,7 +153,8 @@ SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) { auto cc = storage::MmapManager::GetInstance().GetChunkCache(); for (const auto& data_path : field_info.insert_files) { - auto column = cc->Read(data_path, mmap_descriptor_); + auto column = + cc->Read(data_path, mmap_descriptor_, field_meta, mmap_enabled); } } @@ -833,7 +834,11 @@ std::tuple> static ReadFromChunkCache( const storage::ChunkCachePtr& cc, const std::string& data_path, const storage::MmapChunkDescriptorPtr& descriptor) { - auto column = cc->Read(data_path, descriptor); + // For mmap mode, field_meta is unused, so just construct a fake field meta. + auto fm = + FieldMeta(FieldName(""), FieldId(0), milvus::DataType::NONE, false); + // TODO: add Load() interface for chunk cache when support retrieve_enable, make Read() raise error if cache miss + auto column = cc->Read(data_path, descriptor, fm, true); cc->Prefetch(data_path); return {data_path, column}; } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index db7b658e42..fdc7dfd996 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -296,7 +296,7 @@ class SegmentSealedImpl : public SegmentSealed { LoadScalarIndex(const LoadIndexInfo& info); void - WarmupChunkCache(const FieldId field_id) override; + WarmupChunkCache(const FieldId field_id, bool mmap_enabled) override; bool generate_interim_index(const FieldId field_id); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 1d453d89ff..69d7b07cec 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -472,14 +472,16 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment, } CStatus -WarmupChunkCache(CSegmentInterface c_segment, int64_t field_id) { +WarmupChunkCache(CSegmentInterface c_segment, + int64_t field_id, + bool mmap_enabled) { try { auto segment_interface = reinterpret_cast(c_segment); auto segment = dynamic_cast(segment_interface); AssertInfo(segment != nullptr, "segment conversion failed"); - segment->WarmupChunkCache(milvus::FieldId(field_id)); + segment->WarmupChunkCache(milvus::FieldId(field_id), mmap_enabled); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(milvus::UnexpectedError, e.what()); diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index ff1ecd44d0..50fc92cefb 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -133,7 +133,9 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment, CLoadFieldDataInfo c_load_field_data_info); CStatus -WarmupChunkCache(CSegmentInterface c_segment, int64_t field_id); +WarmupChunkCache(CSegmentInterface c_segment, + int64_t field_id, + bool mmap_enabled); ////////////////////////////// interfaces for SegmentInterface ////////////////////////////// CStatus diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 8d5497df1c..10871babf9 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -22,7 +22,9 @@ namespace milvus::storage { std::shared_ptr ChunkCache::Read(const std::string& filepath, - const MmapChunkDescriptorPtr& descriptor) { + const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled) { // use rlock to get future { std::shared_lock lck(mutex_); @@ -60,7 +62,8 @@ ChunkCache::Read(const std::string& filepath, std::string err_msg = ""; try { field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath); - column = Mmap(field_data->GetFieldData(), descriptor); + column = Mmap( + field_data->GetFieldData(), descriptor, field_meta, mmap_enabled); allocate_success = true; } catch (const SegcoreError& e) { err_code = e.get_error_code(); @@ -118,7 +121,9 @@ ChunkCache::Prefetch(const std::string& filepath) { std::shared_ptr ChunkCache::Mmap(const FieldDataPtr& field_data, - const MmapChunkDescriptorPtr& descriptor) { + const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled) { auto dim = field_data->get_dim(); auto data_type = field_data->get_data_type(); @@ -133,20 +138,30 @@ ChunkCache::Mmap(const FieldDataPtr& field_data, indices.push_back(offset); offset += field_data->DataSize(i); } - auto sparse_column = std::make_shared( - data_size, dim, data_type, mcm_, descriptor); + std::shared_ptr sparse_column; + if (mmap_enabled) { + sparse_column = std::make_shared( + data_size, dim, data_type, mcm_, descriptor); + } else { + sparse_column = std::make_shared(field_meta); + } sparse_column->Seal(std::move(indices)); column = std::move(sparse_column); } else if (IsVariableDataType(data_type)) { AssertInfo( false, "TODO: unimplemented for variable data type: {}", data_type); } else { - column = std::make_shared(data_size, - dim, - data_type, - mcm_, - descriptor, - field_data->IsNullable()); + if (mmap_enabled) { + column = std::make_shared(data_size, + dim, + data_type, + mcm_, + descriptor, + field_data->IsNullable()); + } else { + column = std::make_shared(field_data->get_num_rows(), + field_meta); + } } column->AppendBatch(field_data); return column; diff --git a/internal/core/src/storage/ChunkCache.h b/internal/core/src/storage/ChunkCache.h index 2af89386fe..b2b36c6ec4 100644 --- a/internal/core/src/storage/ChunkCache.h +++ b/internal/core/src/storage/ChunkCache.h @@ -45,7 +45,10 @@ class ChunkCache { public: std::shared_ptr - Read(const std::string& filepath, const MmapChunkDescriptorPtr& descriptor); + Read(const std::string& filepath, + const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled); void Remove(const std::string& filepath); @@ -56,10 +59,9 @@ class ChunkCache { private: std::shared_ptr Mmap(const FieldDataPtr& field_data, - const MmapChunkDescriptorPtr& descriptor); - - std::string - CachePath(const std::string& filepath); + const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled); private: using ColumnTable = std::unordered_map< diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index 6382430439..e6147221eb 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -83,7 +83,56 @@ TEST_F(ChunkCacheTest, Read) { field_meta); auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); - const auto& column = cc->Read(file_name, descriptor); + const auto& column = cc->Read(file_name, descriptor, field_meta, true); + Assert(column->ByteSize() == dim * N * 4); + + auto actual = (float*)column->Data(); + for (auto i = 0; i < N; i++) { + AssertInfo(data[i] == actual[i], + fmt::format("expect {}, actual {}", data[i], actual[i])); + } + + cc->Remove(file_name); + lcm->Remove(file_name); +} + +TEST_F(ChunkCacheTest, ReadByMemoryMode) { + auto N = 10000; + auto dim = 128; + auto metric_type = knowhere::metric::L2; + + auto schema = std::make_shared(); + auto fake_id = schema->AddDebugField( + "fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type); + auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64); + schema->set_primary_field_id(i64_fid); + + auto dataset = milvus::segcore::DataGen(schema, N); + + auto field_data_meta = + milvus::storage::FieldDataMeta{1, 2, 3, fake_id.get()}; + auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"), + fake_id, + milvus::DataType::VECTOR_FLOAT, + dim, + metric_type, + false); + + auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager(); + auto data = dataset.get_col(fake_id); + auto data_slices = std::vector{data.data()}; + auto slice_sizes = std::vector{static_cast(N)}; + auto slice_names = std::vector{file_name}; + PutFieldData(lcm.get(), + data_slices, + slice_sizes, + slice_names, + field_data_meta, + field_meta); + + auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); + const auto& column = cc->Read(file_name, descriptor, field_meta, false); Assert(column->ByteSize() == dim * N * 4); auto actual = (float*)column->Data(); @@ -136,7 +185,7 @@ TEST_F(ChunkCacheTest, TestMultithreads) { constexpr int threads = 16; std::vector total_counts(threads); auto executor = [&](int thread_id) { - const auto& column = cc->Read(file_name, descriptor); + const auto& column = cc->Read(file_name, descriptor, field_meta, true); Assert(column->ByteSize() == dim * N * 4); auto actual = (float*)column->Data(); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 79772c5b3e..6b77798c0d 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1715,7 +1715,7 @@ TEST(Sealed, WarmupChunkCache) { auto has = segment->HasRawData(vec_info.field_id); EXPECT_FALSE(has); - segment_sealed->WarmupChunkCache(FieldId(vec_info.field_id)); + segment_sealed->WarmupChunkCache(FieldId(vec_info.field_id), true); auto ids_ds = GenRandomIds(N); auto result = diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index ee2c3bfc64..58a21765cd 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1166,7 +1166,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn } // 4. - s.WarmupChunkCache(ctx, indexInfo.GetFieldID()) + s.WarmupChunkCache(ctx, indexInfo.GetFieldID(), isDataMmapEnable(fieldSchema)) warmupChunkCacheSpan := tr.RecordSpan() log.Info("Finish loading index", zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan), @@ -1214,12 +1214,13 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F return nil } -func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { +func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmapEnabled bool) { log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), zap.Int64("fieldID", fieldID), + zap.Bool("mmapEnabled", mmapEnabled), ) if !s.ptrLock.RLockIf(state.IsNotReleased) { return @@ -1233,7 +1234,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { case "sync": GetWarmupPool().Submit(func() (any, error) { cFieldID := C.int64_t(fieldID) - status = C.WarmupChunkCache(s.ptr, cFieldID) + cMmapEnabled := C.bool(mmapEnabled) + status = C.WarmupChunkCache(s.ptr, cFieldID, cMmapEnabled) if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil { log.Warn("warming up chunk cache synchronously failed", zap.Error(err)) return nil, err @@ -1253,7 +1255,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { defer s.ptrLock.RUnlock() cFieldID := C.int64_t(fieldID) - status = C.WarmupChunkCache(s.ptr, cFieldID) + cMmapEnabled := C.bool(mmapEnabled) + status = C.WarmupChunkCache(s.ptr, cFieldID, cMmapEnabled) if err := HandleCStatus(ctx, &status, ""); err != nil { log.Warn("warming up chunk cache asynchronously failed", zap.Error(err)) return nil, err