mirror of https://github.com/milvus-io/milvus.git
parent
fd4411a8ad
commit
926607b519
|
@ -81,14 +81,18 @@ DiskFileManagerImpl::LoadFile(const std::string& file) noexcept {
|
|||
|
||||
std::pair<std::string, size_t>
|
||||
EncodeAndUploadIndexSlice(RemoteChunkManager* remote_chunk_manager,
|
||||
uint8_t* buf,
|
||||
const std::string& file,
|
||||
int64_t offset,
|
||||
int64_t batch_size,
|
||||
IndexMeta index_meta,
|
||||
FieldDataMeta field_meta,
|
||||
std::string object_key) {
|
||||
auto& local_chunk_manager = LocalChunkManager::GetInstance();
|
||||
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[batch_size]);
|
||||
local_chunk_manager.Read(file, offset, buf.get(), batch_size);
|
||||
|
||||
auto field_data = milvus::storage::FieldDataFactory::GetInstance().CreateFieldData(DataType::INT8);
|
||||
field_data->FillFieldData(buf + offset, batch_size);
|
||||
field_data->FillFieldData(buf.get(), batch_size);
|
||||
auto indexData = std::make_shared<IndexData>(field_data);
|
||||
indexData->set_index_meta(index_meta);
|
||||
indexData->SetFieldDataMeta(field_meta);
|
||||
|
@ -113,8 +117,6 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
|
|||
|
||||
auto fileName = GetFileName(file);
|
||||
auto fileSize = local_chunk_manager.Size(file);
|
||||
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[fileSize]);
|
||||
local_chunk_manager.Read(file, buf.get(), fileSize);
|
||||
|
||||
// Split local data to multi part with specified size
|
||||
int slice_num = 0;
|
||||
|
@ -122,12 +124,12 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
|
|||
std::vector<std::future<std::pair<std::string, size_t>>> futures;
|
||||
for (int64_t offset = 0; offset < fileSize; slice_num++) {
|
||||
auto batch_size = std::min(index_file_slice_size << 20, int64_t(fileSize) - offset);
|
||||
|
||||
// Put file to remote
|
||||
char objectKey[200];
|
||||
snprintf(objectKey, sizeof(objectKey), "%s/%s_%d", remotePrefix.c_str(), fileName.c_str(), slice_num);
|
||||
|
||||
// use multi-thread to put part file
|
||||
futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, rcm_.get(), buf.get(), offset, batch_size, index_meta_,
|
||||
futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, rcm_.get(), file, offset, batch_size, index_meta_,
|
||||
field_meta_, std::string(objectKey)));
|
||||
offset += batch_size;
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -235,6 +236,8 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error {
|
|||
} else {
|
||||
logutil.Logger(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
|
||||
}
|
||||
blobs = nil
|
||||
debug.FreeOSMemory()
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue