Fix cache disk index error (#20419)

Signed-off-by: xige-16 <xi.ge@zilliz.com>

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/20475/head
xige-16 2022-11-21 17:45:23 +08:00 committed by GitHub
parent 82570e057c
commit 15badd0263
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 24 deletions

View File

@ -37,7 +37,7 @@ const char INDEX_BUILD_ID_KEY[] = "indexBuildID";
const char INDEX_ROOT_PATH[] = "index_files";
const char RAWDATA_ROOT_PATH[] = "raw_datas";
const int DEFAULT_DISK_INDEX_MAX_MEMORY_LIMIT = 2; // gigabytes
const int64_t DEFAULT_DISK_INDEX_MAX_MEMORY_LIMIT = 67108864; // bytes
const int64_t DEFAULT_THREAD_CORE_COEFFICIENT = 50;
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 4; // megabytes

View File

@ -187,19 +187,13 @@ DiskFileManagerImpl::CacheIndexToDisk(std::vector<std::string> remote_files) {
}
}
uint64_t
DownloadAndDecodeRemoteIndexfile(RemoteChunkManager* remote_chunk_manager,
std::string file,
milvus::storage::Payload** index_payload_ptr) {
std::unique_ptr<DataCodec>
DownloadAndDecodeRemoteIndexfile(RemoteChunkManager* remote_chunk_manager, std::string file) {
auto fileSize = remote_chunk_manager->Size(file);
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
remote_chunk_manager->Read(file, buf.get(), fileSize);
auto decoded_index_data = DeserializeFileData(buf.get(), fileSize);
auto index_payload = decoded_index_data->GetPayload();
auto index_size = index_payload->rows * sizeof(uint8_t);
*index_payload_ptr = index_payload.release();
return index_size;
return DeserializeFileData(buf.get(), fileSize);
}
uint64_t
@ -209,25 +203,21 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk(const std::vector<std::string>&
auto& local_chunk_manager = LocalChunkManager::GetInstance();
auto& pool = ThreadPool::GetInstance();
int batch_size = remote_files.size();
std::vector<milvus::storage::Payload*> cache_payloads(batch_size);
for (size_t i = 0; i < cache_payloads.size(); ++i) {
cache_payloads[i] = nullptr;
}
std::vector<uint64_t> cache_payload_sizes(batch_size);
std::vector<std::future<uint64_t>> futures;
std::vector<std::future<std::unique_ptr<DataCodec>>> futures;
for (int i = 0; i < batch_size; ++i) {
futures.push_back(
pool.Submit(DownloadAndDecodeRemoteIndexfile, rcm_.get(), remote_files[i], &cache_payloads[i]));
}
for (int i = 0; i < batch_size; ++i) {
cache_payload_sizes[i] = futures[i].get();
futures.push_back(pool.Submit(DownloadAndDecodeRemoteIndexfile, rcm_.get(), remote_files[i]));
}
uint64_t offset = local_file_init_offfset;
for (int i = 0; i < batch_size; ++i) {
local_chunk_manager.Write(local_file_name, offset, const_cast<uint8_t*>((cache_payloads[i])->raw_data),
cache_payload_sizes[i]);
offset += cache_payload_sizes[i];
auto res = futures[i].get();
auto index_payload = res->GetPayload();
auto index_size = index_payload->rows * sizeof(uint8_t);
local_chunk_manager.Write(local_file_name, offset, const_cast<uint8_t*>(index_payload->raw_data), index_size);
offset += index_size;
}
return offset;
}