mirror of https://github.com/milvus-io/milvus.git
solve server exit stuck problem (#4452)
* update ThreadPool api Signed-off-by: peng.xu <peng.xu@zilliz.com> * update notify position Signed-off-by: peng.xu <peng.xu@zilliz.com> * update ThreadPool Stop API Signed-off-by: peng.xu <peng.xu@zilliz.com>pull/4500/head^2
parent
fc005d8133
commit
8f4e1d3851
|
@ -159,6 +159,9 @@ DBImpl::Stop() {
|
|||
LOG_ENGINE_DEBUG_ << "DBImpl::Stop bg_index_thread_.join()";
|
||||
swn_metric_.Notify();
|
||||
bg_metric_thread_.join();
|
||||
} else {
|
||||
swn_metric_.Notify();
|
||||
bg_metric_thread_.join();
|
||||
}
|
||||
|
||||
// LOG_ENGINE_TRACE_ << "DB service stop";
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
@ -37,7 +38,7 @@ class ThreadPool {
|
|||
enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
|
||||
|
||||
void
|
||||
Stop();
|
||||
Stop(bool immediate = false);
|
||||
|
||||
~ThreadPool();
|
||||
|
||||
|
@ -56,10 +57,12 @@ class ThreadPool {
|
|||
std::condition_variable condition_;
|
||||
|
||||
std::atomic_bool stop_;
|
||||
std::atomic_bool stop_immediate_;
|
||||
};
|
||||
|
||||
// the constructor just launches some amount of workers
|
||||
inline ThreadPool::ThreadPool(size_t threads, size_t queue_size) : max_queue_size_(queue_size), stop_(false) {
|
||||
inline ThreadPool::ThreadPool(size_t threads, size_t queue_size)
|
||||
: max_queue_size_(queue_size), stop_(false), stop_immediate_(false) {
|
||||
for (size_t i = 0; i < threads; ++i)
|
||||
workers_.emplace_back([this] {
|
||||
for (;;) {
|
||||
|
@ -67,8 +70,9 @@ inline ThreadPool::ThreadPool(size_t threads, size_t queue_size) : max_queue_siz
|
|||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->queue_mutex_);
|
||||
this->condition_.wait(lock, [this] { return this->stop_ || !this->tasks_.empty(); });
|
||||
if (this->stop_ && this->tasks_.empty())
|
||||
this->condition_.wait(
|
||||
lock, [this] { return this->stop_immediate_ || this->stop_ || !this->tasks_.empty(); });
|
||||
if (this->stop_immediate_ || (this->stop_ && this->tasks_.empty()))
|
||||
return;
|
||||
task = std::move(this->tasks_.front());
|
||||
this->tasks_.pop();
|
||||
|
@ -94,7 +98,7 @@ ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_o
|
|||
std::unique_lock<std::mutex> lock(queue_mutex_);
|
||||
this->condition_.wait(lock, [this] { return this->tasks_.size() < max_queue_size_; });
|
||||
// don't allow enqueueing after stopping the pool
|
||||
if (stop_)
|
||||
if (stop_ || stop_immediate_)
|
||||
throw std::runtime_error("enqueue on stopped ThreadPool");
|
||||
|
||||
tasks_.emplace([task]() { (*task)(); });
|
||||
|
@ -104,12 +108,13 @@ ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_o
|
|||
}
|
||||
|
||||
inline void
|
||||
ThreadPool::Stop() {
|
||||
if (stop_) {
|
||||
ThreadPool::Stop(bool immediate) {
|
||||
if (stop_ || stop_immediate_) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex_);
|
||||
stop_immediate_ = immediate;
|
||||
stop_ = true;
|
||||
}
|
||||
condition_.notify_all();
|
||||
|
|
|
@ -60,7 +60,7 @@ TimerManager::Stop() {
|
|||
}
|
||||
}
|
||||
if (timer_exeutors_) {
|
||||
timer_exeutors_->Stop();
|
||||
timer_exeutors_->Stop(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue