Store index files in slices to reduce memory peaks (#22369) (#22455)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/22527/head
cai.zhang 2023-03-01 19:43:47 +08:00 committed by GitHub
parent 7478e44911
commit 5cae49e28d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 6 deletions

View File

@ -80,14 +80,18 @@ DiskFileManagerImpl::LoadFile(const std::string& file) noexcept {
std::pair<std::string, size_t>
EncodeAndUploadIndexSlice(RemoteChunkManager* remote_chunk_manager,
uint8_t* buf,
const std::string& file,
int64_t offset,
int64_t batch_size,
IndexMeta index_meta,
FieldDataMeta field_meta,
std::string object_key) {
auto fieldData = std::make_shared<FieldData>(buf + offset, batch_size);
auto& local_chunk_manager = LocalChunkManager::GetInstance();
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[batch_size]);
local_chunk_manager.Read(file, offset, buf.get(), batch_size);
auto fieldData = std::make_shared<FieldData>(buf.get(), batch_size);
auto indexData = std::make_shared<IndexData>(fieldData);
indexData->set_index_meta(index_meta);
indexData->SetFieldDataMeta(field_meta);
auto serialized_index_data = indexData->serialize_to_remote_file();
@ -111,8 +115,6 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
auto fileName = GetFileName(file);
auto fileSize = local_chunk_manager.Size(file);
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[fileSize]);
local_chunk_manager.Read(file, buf.get(), fileSize);
// Split local data to multi part with specified size
int slice_num = 0;
@ -120,12 +122,12 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
std::vector<std::future<std::pair<std::string, size_t>>> futures;
for (int64_t offset = 0; offset < fileSize; slice_num++) {
auto batch_size = std::min(index_file_slice_size << 20, int64_t(fileSize) - offset);
// Put file to remote
char objectKey[200];
snprintf(objectKey, sizeof(objectKey), "%s/%s_%d", remotePrefix.c_str(), fileName.c_str(), slice_num);
// use multi-thread to put part file
futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, rcm_.get(), buf.get(), offset, batch_size, index_meta_,
futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, rcm_.get(), file, offset, batch_size, index_meta_,
field_meta_, std::string(objectKey)));
offset += batch_size;
}

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"runtime"
"runtime/debug"
"strconv"
"strings"
"time"
@ -234,6 +235,8 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error {
log.Ctx(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
}
blobs = nil
debug.FreeOSMemory()
return err
}