diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 6a1ca207e8..d789ffe422 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -30,6 +31,7 @@ #include "common/EasyAssert.h" #include "common/FieldData.h" #include "common/FieldDataInterface.h" +#include "common/File.h" #include "common/Slice.h" #include "common/Types.h" #include "log/Log.h" @@ -171,6 +173,7 @@ DiskFileManagerImpl::AddBatchIndexFiles( auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; + futures.reserve(remote_file_sizes.size()); AssertInfo(local_file_offsets.size() == remote_files.size(), "inconsistent size of offset slices with file slices"); AssertInfo(remote_files.size() == remote_file_sizes.size(), @@ -287,7 +290,7 @@ DiskFileManagerImpl::CacheIndexToDisk( std::map> index_slices; for (auto& file_path : remote_files) { - auto pos = file_path.find_last_of("_"); + auto pos = file_path.find_last_of('_'); index_slices[file_path.substr(0, pos)].emplace_back( std::stoi(file_path.substr(pos + 1))); } @@ -296,39 +299,30 @@ DiskFileManagerImpl::CacheIndexToDisk( std::sort(slices.second.begin(), slices.second.end()); } - auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t { - auto fileSize = rcm_->Size(file); - return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize); - }; - for (auto& slices : index_slices) { auto prefix = slices.first; auto local_index_file_name = GetLocalIndexObjectPrefix() + prefix.substr(prefix.find_last_of('/') + 1); local_chunk_manager->CreateFile(local_index_file_name); - int64_t offset = 0; + auto file = + File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC); + + // Get the remote files std::vector batch_remote_files; - uint64_t max_parallel_degree = INT_MAX; + batch_remote_files.reserve(slices.second.size()); for (int& iter : slices.second) { - if (batch_remote_files.size() == max_parallel_degree) { - auto next_offset = CacheBatchIndexFilesToDisk( - batch_remote_files, local_index_file_name, offset); - offset = next_offset; - batch_remote_files.clear(); - } auto origin_file = prefix + "_" + std::to_string(iter); - if (batch_remote_files.size() == 0) { - // Use first file size as average size to estimate - max_parallel_degree = EstimateParallelDegree(origin_file); - } batch_remote_files.push_back(origin_file); } - if (batch_remote_files.size() > 0) { - auto next_offset = CacheBatchIndexFilesToDisk( - batch_remote_files, local_index_file_name, offset); - offset = next_offset; - batch_remote_files.clear(); + + auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files); + for (auto& chunk : index_chunks) { + auto index_data = chunk.get()->GetFieldData(); + auto index_size = index_data->Size(); + auto chunk_data = reinterpret_cast( + const_cast(index_data->Data())); + file.Write(chunk_data, index_size); } local_paths_.emplace_back(local_index_file_name); }