Handle exception while loading (#28304)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/28323/head
yah01 2023-11-09 17:59:12 +08:00 committed by GitHub
parent b7b31ce0bc
commit 30847cad3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 27 deletions

View File

@ -1,6 +1,7 @@
#include <oneapi/tbb/concurrent_queue.h>
#include <atomic>
#include <exception>
#include <optional>
namespace milvus {
@ -33,6 +34,9 @@ class Channel {
std::optional<T> result;
inner_.pop(result);
if (!result.has_value()) {
if (ex_.has_value()) {
throw ex_.value();
}
return false;
}
value = std::move(result.value());
@ -40,11 +44,15 @@ class Channel {
}
void
close() {
close(std::optional<std::exception> ex = std::nullopt) {
if (ex.has_value()) {
ex_ = std::move(ex);
}
inner_.push(std::nullopt);
}
private:
oneapi::tbb::concurrent_bounded_queue<std::optional<T>> inner_{};
std::optional<std::exception> ex_{};
};
} // namespace milvus

View File

@ -194,6 +194,8 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
auto field_data_info =
FieldDataInfo(field_id.get(), num_rows, load_info.mmap_dir_path);
LOG_SEGCORE_INFO_ << "start to load field data " << id << " of segment "
<< this->id_;
auto parallel_degree = static_cast<uint64_t>(
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
field_data_info.channel->set_capacity(parallel_degree * 2);

View File

@ -15,6 +15,7 @@
#include <string>
#include "index/ScalarIndex.h"
#include "log/Log.h"
#include "storage/FieldData.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "common/Common.h"
@ -702,41 +703,46 @@ ReverseDataFromIndex(const index::IndexBase* index,
void
LoadFieldDatasFromRemote(std::vector<std::string>& remote_files,
storage::FieldDataChannelPtr channel) {
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
try {
auto parallel_degree = static_cast<uint64_t>(
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
std::sort(remote_files.begin(),
remote_files.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of('/') + 1)) <
std::stol(b.substr(b.find_last_of('/') + 1));
});
auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
std::sort(remote_files.begin(),
remote_files.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of('/') + 1)) <
std::stol(b.substr(b.find_last_of('/') + 1));
});
std::vector<std::string> batch_files;
std::vector<std::string> batch_files;
auto FetchRawData = [&]() {
auto result = storage::GetObjectData(rcm.get(), batch_files);
for (auto& data : result) {
channel->push(data);
auto FetchRawData = [&]() {
auto result = storage::GetObjectData(rcm.get(), batch_files);
for (auto& data : result) {
channel->push(data);
}
};
for (auto& file : remote_files) {
if (batch_files.size() >= parallel_degree) {
FetchRawData();
batch_files.clear();
}
batch_files.emplace_back(file);
}
};
for (auto& file : remote_files) {
if (batch_files.size() >= parallel_degree) {
if (batch_files.size() > 0) {
FetchRawData();
batch_files.clear();
}
batch_files.emplace_back(file);
channel->close();
} catch (std::exception e) {
LOG_SEGCORE_INFO_ << "failed to load data from remote: " << e.what();
channel->close(std::move(e));
}
if (batch_files.size() > 0) {
FetchRawData();
}
channel->close();
}
int64_t