enhance: reduce many I/O operations while loading disk index (#30189) (#30690)

before this, every time writting the index chunk data into the disk,
there are 4 I/O operations:
- open the file
- seek to the offset
- write the data
- close the file

this optimized this to open only once and continiously write all data.

This also makes it concurrent to load the files from object storage

pr: #30189

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/30707/head
yah01 2024-02-20 17:40:52 +08:00 committed by GitHub
parent 8734bcc645
commit d4c4bf946b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 21 additions and 23 deletions

View File

@ -14,16 +14,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/fcntl.h>
#include <algorithm>
#include <boost/filesystem.hpp>
#include <mutex>
#include <utility>
#include "common/Common.h"
#include "common/Consts.h"
#include "common/EasyAssert.h"
#include "common/File.h"
#include "common/Slice.h"
#include "log/Log.h"
#include "storage/DiskFileManagerImpl.h"
#include "storage/FieldData.h"
#include "storage/FieldDataInterface.h"
#include "storage/FileManager.h"
#include "storage/IndexData.h"
#include "storage/LocalChunkManagerSingleton.h"
@ -120,6 +126,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
futures.reserve(remote_file_sizes.size());
AssertInfo(local_file_offsets.size() == remote_files.size(),
"inconsistent size of offset slices with file slices");
AssertInfo(remote_files.size() == remote_file_sizes.size(),
@ -167,7 +174,7 @@ DiskFileManagerImpl::CacheIndexToDisk(
std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
auto pos = file_path.find_last_of("_");
auto pos = file_path.find_last_of('_');
index_slices[file_path.substr(0, pos)].emplace_back(
std::stoi(file_path.substr(pos + 1)));
}
@ -176,39 +183,30 @@ DiskFileManagerImpl::CacheIndexToDisk(
std::sort(slices.second.begin(), slices.second.end());
}
auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t {
auto fileSize = rcm_->Size(file);
return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize);
};
for (auto& slices : index_slices) {
auto prefix = slices.first;
auto local_index_file_name =
GetLocalIndexObjectPrefix() +
prefix.substr(prefix.find_last_of('/') + 1);
local_chunk_manager->CreateFile(local_index_file_name);
int64_t offset = 0;
auto file =
File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC);
// Get the remote files
std::vector<std::string> batch_remote_files;
uint64_t max_parallel_degree = INT_MAX;
batch_remote_files.reserve(slices.second.size());
for (int& iter : slices.second) {
if (batch_remote_files.size() == max_parallel_degree) {
auto next_offset = CacheBatchIndexFilesToDisk(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
auto origin_file = prefix + "_" + std::to_string(iter);
if (batch_remote_files.size() == 0) {
// Use first file size as average size to estimate
max_parallel_degree = EstimateParallelDegree(origin_file);
}
batch_remote_files.push_back(origin_file);
}
if (batch_remote_files.size() > 0) {
auto next_offset = CacheBatchIndexFilesToDisk(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk : index_chunks) {
auto index_data = chunk.get()->GetFieldData();
auto index_size = index_data->Size();
auto chunk_data = reinterpret_cast<uint8_t*>(
const_cast<void*>(index_data->Data()));
file.Write(chunk_data, index_size);
}
local_paths_.emplace_back(local_index_file_name);
}