mirror of https://github.com/milvus-io/milvus.git
parent
12f1d3c23c
commit
181781344f
|
@ -1866,8 +1866,11 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, meta::FilesH
|
|||
ResumeIfLast();
|
||||
|
||||
files_holder.ReleaseFiles();
|
||||
if (!job->GetStatus().ok()) {
|
||||
return job->GetStatus();
|
||||
|
||||
Status job_status;
|
||||
job->GetStatus(job_status);
|
||||
if (!job_status.ok()) {
|
||||
return job_status;
|
||||
}
|
||||
|
||||
// step 3: construct results
|
||||
|
@ -2063,11 +2066,12 @@ DBImpl::BackgroundBuildIndex() {
|
|||
meta::SegmentSchema& file_schema = *(iter->second.get());
|
||||
job->WaitBuildIndexFinish();
|
||||
LOG_ENGINE_INFO_ << "Build Index Progress: " << ++completed << " of " << job2file_map.size();
|
||||
if (!job->GetStatus().ok()) {
|
||||
Status status = job->GetStatus();
|
||||
LOG_ENGINE_ERROR_ << "Building index job " << job->id() << " failed: " << status.ToString();
|
||||
Status job_status;
|
||||
job->GetStatus(job_status);
|
||||
if (!job_status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << "Building index job " << job->id() << " failed: " << job_status.ToString();
|
||||
|
||||
index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
|
||||
index_failed_checker_.MarkFailedIndexFile(file_schema, job_status.message());
|
||||
} else {
|
||||
LOG_ENGINE_DEBUG_ << "Building index job " << job->id() << " succeed.";
|
||||
|
||||
|
|
|
@ -68,5 +68,17 @@ BuildIndexJob::OnCacheInsertDataChanged(bool value) {
|
|||
options_.insert_cache_immediately_ = value;
|
||||
}
|
||||
|
||||
void
|
||||
BuildIndexJob::SetStatus(const Status& status) {
|
||||
std::lock_guard<std::mutex> lock(status_mutex_);
|
||||
status_ = status;
|
||||
}
|
||||
|
||||
void
|
||||
BuildIndexJob::GetStatus(Status& status) {
|
||||
std::lock_guard<std::mutex> lock(status_mutex_);
|
||||
status = status_;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
|
|
@ -50,15 +50,16 @@ class BuildIndexJob : public Job, public server::CacheConfigHandler {
|
|||
void
|
||||
BuildIndexDone(size_t to_index_id);
|
||||
|
||||
void
|
||||
SetStatus(const Status& status);
|
||||
|
||||
void
|
||||
GetStatus(Status& status);
|
||||
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
Status&
|
||||
GetStatus() {
|
||||
return status_;
|
||||
}
|
||||
|
||||
Id2ToIndexMap&
|
||||
to_index_files() {
|
||||
return to_index_files_;
|
||||
|
@ -84,6 +85,8 @@ class BuildIndexJob : public Job, public server::CacheConfigHandler {
|
|||
engine::DBOptions options_;
|
||||
|
||||
Status status_;
|
||||
std::mutex status_mutex_;
|
||||
|
||||
std::mutex mutex_;
|
||||
std::condition_variable cv_;
|
||||
};
|
||||
|
|
|
@ -68,11 +68,6 @@ SearchJob::GetResultDistances() {
|
|||
return result_distances_;
|
||||
}
|
||||
|
||||
Status&
|
||||
SearchJob::GetStatus() {
|
||||
return status_;
|
||||
}
|
||||
|
||||
json
|
||||
SearchJob::Dump() const {
|
||||
json ret{
|
||||
|
@ -90,5 +85,17 @@ SearchJob::GetContext() const {
|
|||
return context_;
|
||||
}
|
||||
|
||||
void
|
||||
SearchJob::SetStatus(const Status& status) {
|
||||
std::lock_guard<std::mutex> lock(status_mutex_);
|
||||
status_ = status;
|
||||
}
|
||||
|
||||
void
|
||||
SearchJob::GetStatus(Status& status) {
|
||||
std::lock_guard<std::mutex> lock(status_mutex_);
|
||||
status = status_;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
|
|
@ -65,8 +65,11 @@ class SearchJob : public Job {
|
|||
ResultDistances&
|
||||
GetResultDistances();
|
||||
|
||||
Status&
|
||||
GetStatus();
|
||||
void
|
||||
SetStatus(const Status& status);
|
||||
|
||||
void
|
||||
GetStatus(Status& status);
|
||||
|
||||
json
|
||||
Dump() const override;
|
||||
|
@ -133,6 +136,7 @@ class SearchJob : public Job {
|
|||
ResultIds result_ids_;
|
||||
ResultDistances result_distances_;
|
||||
Status status_;
|
||||
std::mutex status_mutex_;
|
||||
|
||||
query::GeneralQueryPtr general_query_;
|
||||
std::unordered_map<std::string, engine::meta::hybrid::DataType> attr_type_;
|
||||
|
|
|
@ -93,7 +93,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
|
|||
|
||||
if (auto job = job_.lock()) {
|
||||
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
|
||||
build_index_job->GetStatus() = s;
|
||||
build_index_job->SetStatus(s);
|
||||
build_index_job->BuildIndexDone(file_->id_);
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ XBuildIndexTask::Execute() {
|
|||
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
|
||||
if (to_index_engine_ == nullptr) {
|
||||
build_index_job->BuildIndexDone(to_index_id_);
|
||||
build_index_job->GetStatus() = Status(DB_ERROR, "source index is null");
|
||||
build_index_job->SetStatus(Status(DB_ERROR, "source index is null"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,7 @@ XBuildIndexTask::Execute() {
|
|||
if (!status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.ToString();
|
||||
build_index_job->BuildIndexDone(to_index_id_);
|
||||
build_index_job->GetStatus() = status;
|
||||
build_index_job->SetStatus(status);
|
||||
to_index_engine_ = nullptr;
|
||||
return;
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ XBuildIndexTask::Execute() {
|
|||
LOG_ENGINE_ERROR_ << log_msg;
|
||||
|
||||
build_index_job->BuildIndexDone(to_index_id_);
|
||||
build_index_job->GetStatus() = Status(DB_ERROR, err_msg);
|
||||
build_index_job->SetStatus(Status(DB_ERROR, err_msg));
|
||||
to_index_engine_ = nullptr;
|
||||
};
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
|
|||
|
||||
if (auto job = job_.lock()) {
|
||||
auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);
|
||||
search_job->GetStatus() = s;
|
||||
search_job->SetStatus(s);
|
||||
search_job->SearchDone(file_->id_);
|
||||
}
|
||||
|
||||
|
@ -248,7 +248,7 @@ XSearchTask::Execute() {
|
|||
|
||||
auto Illegal_Metric_Type = [&]() {
|
||||
std::string msg = "Illegal metric type" + metric_type;
|
||||
search_job->GetStatus() = Status(SERVER_INVALID_ARGUMENT, msg);
|
||||
search_job->SetStatus(Status(SERVER_INVALID_ARGUMENT, msg));
|
||||
search_job->SearchDone(index_id_);
|
||||
};
|
||||
|
||||
|
@ -299,7 +299,7 @@ XSearchTask::Execute() {
|
|||
s = index_engine_->ExecBinaryQuery(general_query, bitset, types, nq, topk, output_distance, output_ids);
|
||||
|
||||
if (!s.ok()) {
|
||||
search_job->GetStatus() = s;
|
||||
search_job->SetStatus(s);
|
||||
search_job->SearchDone(index_id_);
|
||||
return;
|
||||
}
|
||||
|
@ -332,7 +332,7 @@ XSearchTask::Execute() {
|
|||
fiu_do_on("XSearchTask.Execute.search_fail", s = Status(SERVER_UNEXPECTED_ERROR, ""));
|
||||
|
||||
if (!s.ok()) {
|
||||
search_job->GetStatus() = s;
|
||||
search_job->SetStatus(s);
|
||||
search_job->SearchDone(index_id_);
|
||||
return;
|
||||
}
|
||||
|
@ -362,7 +362,7 @@ XSearchTask::Execute() {
|
|||
// search_job->AccumReduceCost(span);
|
||||
} catch (std::exception& ex) {
|
||||
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] SearchTask encounter exception: %s", "search", 0, ex.what());
|
||||
search_job->GetStatus() = Status(SERVER_UNEXPECTED_ERROR, ex.what());
|
||||
search_job->SetStatus(Status(SERVER_UNEXPECTED_ERROR, ex.what()));
|
||||
}
|
||||
|
||||
// step 4: notify to send result to client
|
||||
|
|
Loading…
Reference in New Issue