mirror of https://github.com/milvus-io/milvus.git
Revert "enhance: reduce many I/O operations while loading disk index (#30189) (#30690)" This reverts commitpull/30805/head v2.3.10d4c4bf946b
. Revert "enhance: limit the max pool size to 16 (#30371) (#30415)" This reverts commit52ac0718f0
. Revert "enhance: convert the `GetObject` util to async (#30166) (#30197)" This reverts commit4b7c5baab7
. Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
parent
2896f5eb69
commit
2f4a13a7ae
|
@ -14,27 +14,20 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <sys/fcntl.h>
|
||||
#include <algorithm>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
#include "common/Common.h"
|
||||
#include "common/Consts.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/File.h"
|
||||
#include "common/Slice.h"
|
||||
#include "log/Log.h"
|
||||
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
#include "storage/FieldData.h"
|
||||
#include "storage/FieldDataInterface.h"
|
||||
#include "storage/FileManager.h"
|
||||
#include "storage/IndexData.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/ThreadPools.h"
|
||||
#include "storage/IndexData.h"
|
||||
#include "storage/Util.h"
|
||||
#include "storage/ThreadPools.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
|
@ -123,27 +116,28 @@ DiskFileManagerImpl::AddBatchIndexFiles(
|
|||
const std::vector<int64_t>& remote_file_sizes) {
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
|
||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
|
||||
|
||||
auto LoadIndexFromDisk = [&](
|
||||
const std::string& file,
|
||||
const int64_t offset,
|
||||
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
|
||||
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
|
||||
local_chunk_manager->Read(file, offset, buf.get(), data_size);
|
||||
return buf;
|
||||
};
|
||||
|
||||
std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
|
||||
futures.reserve(remote_file_sizes.size());
|
||||
AssertInfo(local_file_offsets.size() == remote_files.size(),
|
||||
"inconsistent size of offset slices with file slices");
|
||||
AssertInfo(remote_files.size() == remote_file_sizes.size(),
|
||||
"inconsistent size of file slices with size slices");
|
||||
|
||||
for (int64_t i = 0; i < remote_files.size(); ++i) {
|
||||
futures.push_back(pool.Submit(
|
||||
[&](const std::string& file,
|
||||
const int64_t offset,
|
||||
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
|
||||
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
|
||||
local_chunk_manager->Read(file, offset, buf.get(), data_size);
|
||||
return buf;
|
||||
},
|
||||
local_file_name,
|
||||
local_file_offsets[i],
|
||||
remote_file_sizes[i]));
|
||||
futures.push_back(pool.Submit(LoadIndexFromDisk,
|
||||
local_file_name,
|
||||
local_file_offsets[i],
|
||||
remote_file_sizes[i]));
|
||||
}
|
||||
|
||||
// hold index data util upload index file done
|
||||
|
@ -161,8 +155,8 @@ DiskFileManagerImpl::AddBatchIndexFiles(
|
|||
remote_files,
|
||||
field_meta_,
|
||||
index_meta_);
|
||||
for (auto& re : res) {
|
||||
remote_paths_to_size_[re.first] = re.second;
|
||||
for (auto iter = res.begin(); iter != res.end(); ++iter) {
|
||||
remote_paths_to_size_[iter->first] = iter->second;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -183,30 +177,39 @@ DiskFileManagerImpl::CacheIndexToDisk(
|
|||
std::sort(slices.second.begin(), slices.second.end());
|
||||
}
|
||||
|
||||
auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t {
|
||||
auto fileSize = rcm_->Size(file);
|
||||
return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize);
|
||||
};
|
||||
|
||||
for (auto& slices : index_slices) {
|
||||
auto prefix = slices.first;
|
||||
auto local_index_file_name =
|
||||
GetLocalIndexObjectPrefix() +
|
||||
prefix.substr(prefix.find_last_of('/') + 1);
|
||||
local_chunk_manager->CreateFile(local_index_file_name);
|
||||
auto file =
|
||||
File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC);
|
||||
|
||||
// Get the remote files
|
||||
int64_t offset = 0;
|
||||
std::vector<std::string> batch_remote_files;
|
||||
batch_remote_files.reserve(slices.second.size());
|
||||
uint64_t max_parallel_degree = INT_MAX;
|
||||
for (int& iter : slices.second) {
|
||||
if (batch_remote_files.size() == max_parallel_degree) {
|
||||
auto next_offset = CacheBatchIndexFilesToDisk(
|
||||
batch_remote_files, local_index_file_name, offset);
|
||||
offset = next_offset;
|
||||
batch_remote_files.clear();
|
||||
}
|
||||
auto origin_file = prefix + "_" + std::to_string(iter);
|
||||
if (batch_remote_files.size() == 0) {
|
||||
// Use first file size as average size to estimate
|
||||
max_parallel_degree = EstimateParallelDegree(origin_file);
|
||||
}
|
||||
batch_remote_files.push_back(origin_file);
|
||||
}
|
||||
|
||||
auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
|
||||
for (auto& chunk : index_chunks) {
|
||||
auto index_data = chunk.get()->GetFieldData();
|
||||
auto index_size = index_data->Size();
|
||||
auto chunk_data = reinterpret_cast<uint8_t*>(
|
||||
const_cast<void*>(index_data->Data()));
|
||||
file.Write(chunk_data, index_size);
|
||||
if (batch_remote_files.size() > 0) {
|
||||
auto next_offset = CacheBatchIndexFilesToDisk(
|
||||
batch_remote_files, local_index_file_name, offset);
|
||||
offset = next_offset;
|
||||
batch_remote_files.clear();
|
||||
}
|
||||
local_paths_.emplace_back(local_index_file_name);
|
||||
}
|
||||
|
@ -226,7 +229,7 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk(
|
|||
|
||||
uint64_t offset = local_file_init_offfset;
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
auto index_data = index_datas[i].get()->GetFieldData();
|
||||
auto index_data = index_datas[i];
|
||||
auto index_size = index_data->Size();
|
||||
auto uint8_data =
|
||||
reinterpret_cast<uint8_t*>(const_cast<void*>(index_data->Data()));
|
||||
|
@ -270,7 +273,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
|
|||
auto field_datas = GetObjectData(rcm_.get(), batch_files);
|
||||
int batch_size = batch_files.size();
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
auto field_data = field_datas[i].get()->GetFieldData();
|
||||
auto field_data = field_datas[i];
|
||||
num_rows += uint32_t(field_data->get_num_rows());
|
||||
AssertInfo(dim == 0 || dim == field_data->get_dim(),
|
||||
"inconsistent dim value in multi binlogs!");
|
||||
|
|
|
@ -98,8 +98,7 @@ MemFileManagerImpl::LoadIndexToMemory(
|
|||
for (size_t idx = 0; idx < batch_files.size(); ++idx) {
|
||||
auto file_name =
|
||||
batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1);
|
||||
file_to_index_data[file_name] =
|
||||
index_datas[idx].get()->GetFieldData();
|
||||
file_to_index_data[file_name] = index_datas[idx];
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -138,7 +137,7 @@ MemFileManagerImpl::CacheRawDataToMemory(
|
|||
auto FetchRawData = [&]() {
|
||||
auto raw_datas = GetObjectData(rcm_.get(), batch_files);
|
||||
for (auto& data : raw_datas) {
|
||||
field_datas.emplace_back(data.get()->GetFieldData());
|
||||
field_datas.emplace_back(data);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -41,13 +41,10 @@ class ThreadPool {
|
|||
max_threads_size_ = CPU_NUM * thread_core_coefficient;
|
||||
|
||||
// only IO pool will set large limit, but the CPU helps nothing to IO operations,
|
||||
// we need to limit the max thread num, each thread will download 16~64 MiB data,
|
||||
// according to our benchmark, 16 threads is enough to saturate the network bandwidth.
|
||||
if (min_threads_size_ > 16) {
|
||||
min_threads_size_ = 16;
|
||||
}
|
||||
if (max_threads_size_ > 16) {
|
||||
max_threads_size_ = 16;
|
||||
// we need to limit the max thread num, each thread will download 16 MiB data,
|
||||
// it should be not greater than 256 (4GiB data) to avoid OOM and send too many requests to object storage
|
||||
if (max_threads_size_ > 256) {
|
||||
max_threads_size_ = 256;
|
||||
}
|
||||
LOG_SEGCORE_INFO_ << "Init thread pool:" << name_
|
||||
<< " with min worker num:" << min_threads_size_
|
||||
|
|
|
@ -447,17 +447,62 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
|
|||
return std::make_pair(std::move(object_key), serialized_index_size);
|
||||
}
|
||||
|
||||
std::vector<std::future<std::unique_ptr<DataCodec>>>
|
||||
// /**
|
||||
// * Returns the current resident set size (physical memory use) measured
|
||||
// * in bytes, or zero if the value cannot be determined on this OS.
|
||||
// */
|
||||
// size_t
|
||||
// getCurrentRSS() {
|
||||
// #if defined(_WIN32)
|
||||
// /* Windows -------------------------------------------------- */
|
||||
// PROCESS_MEMORY_COUNTERS info;
|
||||
// GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info));
|
||||
// return (size_t)info.WorkingSetSize;
|
||||
|
||||
// #elif defined(__APPLE__) && defined(__MACH__)
|
||||
// /* OSX ------------------------------------------------------ */
|
||||
// struct mach_task_basic_info info;
|
||||
// mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT;
|
||||
// if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != KERN_SUCCESS)
|
||||
// return (size_t)0L; /* Can't access? */
|
||||
// return (size_t)info.resident_size;
|
||||
|
||||
// #elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__)
|
||||
// /* Linux ---------------------------------------------------- */
|
||||
// long rss = 0L;
|
||||
// FILE* fp = NULL;
|
||||
// if ((fp = fopen("/proc/self/statm", "r")) == NULL)
|
||||
// return (size_t)0L; /* Can't open? */
|
||||
// if (fscanf(fp, "%*s%ld", &rss) != 1) {
|
||||
// fclose(fp);
|
||||
// return (size_t)0L; /* Can't read? */
|
||||
// }
|
||||
// fclose(fp);
|
||||
// return (size_t)rss * (size_t)sysconf(_SC_PAGESIZE);
|
||||
|
||||
// #else
|
||||
// /* AIX, BSD, Solaris, and Unknown OS ------------------------ */
|
||||
// return (size_t)0L; /* Unsupported. */
|
||||
// #endif
|
||||
// }
|
||||
|
||||
std::vector<FieldDataPtr>
|
||||
GetObjectData(ChunkManager* remote_chunk_manager,
|
||||
const std::vector<std::string>& remote_files) {
|
||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
|
||||
std::vector<std::future<std::unique_ptr<DataCodec>>> futures;
|
||||
futures.reserve(remote_files.size());
|
||||
for (auto& file : remote_files) {
|
||||
futures.emplace_back(pool.Submit(
|
||||
DownloadAndDecodeRemoteFile, remote_chunk_manager, file));
|
||||
}
|
||||
return futures;
|
||||
|
||||
std::vector<FieldDataPtr> datas;
|
||||
for (int i = 0; i < futures.size(); ++i) {
|
||||
auto res = futures[i].get();
|
||||
datas.emplace_back(res->GetFieldData());
|
||||
}
|
||||
ReleaseArrowUnused();
|
||||
return datas;
|
||||
}
|
||||
|
||||
std::map<std::string, int64_t>
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <future>
|
||||
|
||||
#include "storage/FieldData.h"
|
||||
#include "storage/PayloadStream.h"
|
||||
|
@ -103,7 +102,7 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
|
|||
const FieldMeta& field_meta,
|
||||
std::string object_key);
|
||||
|
||||
std::vector<std::future<std::unique_ptr<DataCodec>>>
|
||||
std::vector<FieldDataPtr>
|
||||
GetObjectData(ChunkManager* remote_chunk_manager,
|
||||
const std::vector<std::string>& remote_files);
|
||||
|
||||
|
|
|
@ -71,13 +71,8 @@ func initDynamicPool() {
|
|||
|
||||
func initLoadPool() {
|
||||
loadOnce.Do(func() {
|
||||
pt := paramtable.Get()
|
||||
poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
|
||||
if poolSize > 16 {
|
||||
poolSize = 16
|
||||
}
|
||||
pool := conc.NewPool[any](
|
||||
poolSize,
|
||||
hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(),
|
||||
conc.WithPreAlloc(false),
|
||||
conc.WithDisablePurge(false),
|
||||
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
|
||||
|
|
Loading…
Reference in New Issue