mirror of https://github.com/milvus-io/milvus.git
fix random crash for scheduler task (#4563)
* fix random crash for scheduler task Signed-off-by: yhmo <yihua.mo@zilliz.com> * fix unittest crash Signed-off-by: yhmo <yihua.mo@zilliz.com>pull/4569/head
parent
ec0291a08d
commit
810f5e6f70
|
@ -728,7 +728,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
|
|||
}
|
||||
#endif
|
||||
to_index->SetUids(uids);
|
||||
LOG_ENGINE_DEBUG_ << "Set " << to_index->GetUids()->size() << "uids for " << location;
|
||||
LOG_ENGINE_DEBUG_ << "Set " << to_index->UidsSize() << "uids for " << location;
|
||||
if (blacklist != nullptr) {
|
||||
to_index->SetBlacklist(blacklist);
|
||||
LOG_ENGINE_DEBUG_ << "Set blacklist for index " << location;
|
||||
|
|
|
@ -101,7 +101,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
|
|||
auto resource = event->resource_;
|
||||
resource->WakeupExecutor();
|
||||
|
||||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
auto task_table_type = load_completed_event->task_table_item_->get_task()->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
|
|
|
@ -56,6 +56,18 @@ TaskTimestamp::Dump() const {
|
|||
return ret;
|
||||
}
|
||||
|
||||
TaskPtr
|
||||
TaskTableItem::get_task() {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
return this->task;
|
||||
}
|
||||
|
||||
void
|
||||
TaskTableItem::set_task(TaskPtr t) {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
this->task = std::move(t);
|
||||
}
|
||||
|
||||
bool
|
||||
TaskTableItem::IsFinish() {
|
||||
return state == TaskTableItemState::MOVED || state == TaskTableItemState::EXECUTED;
|
||||
|
@ -168,7 +180,7 @@ TaskTable::PickToLoad(uint64_t limit) {
|
|||
if (loaded_count >= 1)
|
||||
return std::vector<uint64_t>();
|
||||
} else if (table_[index]->state == TaskTableItemState::START) {
|
||||
auto task = table_[index]->task;
|
||||
auto task = table_[index]->get_task();
|
||||
|
||||
// if task is a build index task, limit it
|
||||
if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
|
||||
|
@ -266,7 +278,7 @@ void
|
|||
TaskTable::Put(TaskPtr task, TaskTableItemPtr from) {
|
||||
auto item = std::make_shared<TaskTableItem>(std::move(from));
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->set_task(std::move(task));
|
||||
item->state = TaskTableItemState::START;
|
||||
item->timestamp.start = get_current_timestamp();
|
||||
table_.put(std::move(item));
|
||||
|
|
|
@ -64,12 +64,17 @@ struct TaskTableItem : public interface::dumpable {
|
|||
TaskTableItem(TaskTableItem&&) = delete;
|
||||
|
||||
uint64_t id; // auto increment from 0;
|
||||
TaskPtr task; // the task;
|
||||
TaskTableItemState state; // the state;
|
||||
std::mutex mutex;
|
||||
TaskTimestamp timestamp;
|
||||
TaskTableItemPtr from;
|
||||
|
||||
TaskPtr
|
||||
get_task();
|
||||
|
||||
void
|
||||
set_task(TaskPtr task);
|
||||
|
||||
bool
|
||||
IsFinish();
|
||||
|
||||
|
@ -93,6 +98,9 @@ struct TaskTableItem : public interface::dumpable {
|
|||
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
private:
|
||||
TaskPtr task; // the task;
|
||||
};
|
||||
|
||||
class TaskTable : public interface::dumpable {
|
||||
|
|
|
@ -88,7 +88,7 @@ void
|
|||
Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) {
|
||||
auto neighbours = get_neighbours(self);
|
||||
for (auto& neighbour : neighbours) {
|
||||
neighbour->task_table().Put(task_item->task, task_item);
|
||||
neighbour->task_table().Put(task_item->get_task(), task_item);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,7 +103,7 @@ void
|
|||
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
auto task_item = event->task_table_item_;
|
||||
auto task = event->task_table_item_->task;
|
||||
auto task = event->task_table_item_->get_task();
|
||||
|
||||
if (resource->name() == task->path().Last()) {
|
||||
resource->WakeupExecutor();
|
||||
|
|
|
@ -130,8 +130,8 @@ Resource::pick_task_execute() {
|
|||
auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max());
|
||||
for (auto index : indexes) {
|
||||
// try to set one task executing, then return
|
||||
if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
if (task_table_[index]->task->path().Last() != name()) {
|
||||
if (task_table_[index]->get_task()->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
if (task_table_[index]->get_task()->path().Last() != name()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -166,19 +166,19 @@ Resource::loader_function() {
|
|||
if (task_item == nullptr) {
|
||||
break;
|
||||
}
|
||||
if (task_item->task->Type() == TaskType::BuildIndexTask && name() == "cpu") {
|
||||
if (task_item->get_task()->Type() == TaskType::BuildIndexTask && name() == "cpu") {
|
||||
BuildMgrInst::GetInstance()->Take();
|
||||
LOG_SERVER_DEBUG_ << name() << " load BuildIndexTask";
|
||||
}
|
||||
LoadFile(task_item->task);
|
||||
LoadFile(task_item->get_task());
|
||||
task_item->Loaded();
|
||||
|
||||
auto& label = task_item->task->label();
|
||||
auto& label = task_item->get_task()->label();
|
||||
if (label != nullptr &&
|
||||
(label->Type() != TaskLabelType::BROADCAST || task_item->task->Type() == TaskType::DeleteTask)) {
|
||||
(label->Type() != TaskLabelType::BROADCAST || task_item->get_task()->Type() == TaskType::DeleteTask)) {
|
||||
if (task_item->from) {
|
||||
task_item->from->Moved();
|
||||
task_item->from->task = FinishedTask::Create(task_item->from->task);
|
||||
task_item->from->set_task(std::move(FinishedTask::Create(task_item->from->get_task())));
|
||||
task_item->from = nullptr;
|
||||
}
|
||||
}
|
||||
|
@ -208,15 +208,15 @@ Resource::executor_function() {
|
|||
break;
|
||||
}
|
||||
auto start = get_current_timestamp();
|
||||
Process(task_item->task);
|
||||
task_item->task = FinishedTask::Create(task_item->task);
|
||||
Process(task_item->get_task());
|
||||
task_item->set_task(std::move(FinishedTask::Create(task_item->get_task())));
|
||||
auto finish = get_current_timestamp();
|
||||
++total_task_;
|
||||
total_cost_ += finish - start;
|
||||
|
||||
task_item->Executed();
|
||||
|
||||
if (task_item->task->Type() == TaskType::BuildIndexTask) {
|
||||
if (task_item->get_task()->Type() == TaskType::BuildIndexTask) {
|
||||
BuildMgrInst::GetInstance()->Put();
|
||||
ResMgrInst::GetInstance()->GetResource("cpu")->WakeupLoader();
|
||||
ResMgrInst::GetInstance()->GetResource("disk")->WakeupLoader();
|
||||
|
|
|
@ -38,7 +38,7 @@ class TaskTableItemTest : public ::testing::Test {
|
|||
|
||||
TEST_F(TaskTableItemTest, CONSTRUCT) {
|
||||
ASSERT_EQ(default_.id, 0);
|
||||
ASSERT_EQ(default_.task, nullptr);
|
||||
ASSERT_EQ(default_.get_task(), nullptr);
|
||||
ASSERT_EQ(default_.state, milvus::scheduler::TaskTableItemState::INVALID);
|
||||
}
|
||||
|
||||
|
@ -178,12 +178,12 @@ TEST_F(TaskTableBaseTest, SUBSCRIBER) {
|
|||
|
||||
TEST_F(TaskTableBaseTest, PUT_TASK) {
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(0)->get_task(), task1_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
|
||||
empty_table_.Put(invalid_task_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, invalid_task_);
|
||||
ASSERT_EQ(empty_table_.at(0)->get_task(), invalid_task_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_BATCH) {
|
||||
|
@ -191,8 +191,8 @@ TEST_F(TaskTableBaseTest, PUT_BATCH) {
|
|||
for (auto& task : tasks) {
|
||||
empty_table_.Put(task);
|
||||
}
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(1)->task, task2_);
|
||||
ASSERT_EQ(empty_table_.at(0)->get_task(), task1_);
|
||||
ASSERT_EQ(empty_table_.at(1)->get_task(), task2_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, SIZE) {
|
||||
|
|
Loading…
Reference in New Issue