mirror of https://github.com/milvus-io/milvus.git
Merge branch 'branch-0.4.0' into 'branch-0.4.0'
MS-365 Use tasktableitemptr instead in event See merge request megasearch/milvus!375 Former-commit-id: baba5e363f2d9cb1a674fe64b63cc063329e4faepull/191/head
commit
4f642936e5
|
@ -20,6 +20,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-359 - Add cost test in new scheduler
|
||||
- MS-361 - Add event in resource
|
||||
- MS-364 - Modify tasktableitem in tasktable
|
||||
- MS-365 - Use tasktableitemptr instead in event
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
|
|
@ -15,11 +15,11 @@ namespace engine {
|
|||
|
||||
class CopyCompletedEvent : public Event {
|
||||
public:
|
||||
CopyCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItem &task_table_item)
|
||||
CopyCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
: Event(EventType::COPY_COMPLETED, std::move(resource)),
|
||||
task_table_item_(task_table_item) {}
|
||||
task_table_item_(std::move(task_table_item)) {}
|
||||
public:
|
||||
TaskTableItem &task_table_item_;
|
||||
TaskTableItemPtr task_table_item_;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -14,12 +14,12 @@ namespace engine {
|
|||
|
||||
class FinishTaskEvent : public Event {
|
||||
public:
|
||||
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItem &task_table_item)
|
||||
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
: Event(EventType::FINISH_TASK, std::move(resource)),
|
||||
task_table_item_(task_table_item) {}
|
||||
task_table_item_(std::move(task_table_item)) {}
|
||||
|
||||
public:
|
||||
TaskTableItem &task_table_item_;
|
||||
TaskTableItemPtr task_table_item_;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -41,23 +41,23 @@ void Resource::WakeupLoader() {
|
|||
load_cv_.notify_one();
|
||||
}
|
||||
|
||||
TaskPtr Resource::pick_task_load() {
|
||||
TaskTableItemPtr Resource::pick_task_load() {
|
||||
auto indexes = PickToLoad(task_table_, 3);
|
||||
for (auto index : indexes) {
|
||||
// try to set one task loading, then return
|
||||
if (task_table_.Load(index))
|
||||
return task_table_.Get(index)->task;
|
||||
return task_table_.Get(index);
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
TaskPtr Resource::pick_task_execute() {
|
||||
TaskTableItemPtr Resource::pick_task_execute() {
|
||||
auto indexes = PickToExecute(task_table_, 3);
|
||||
for (auto index : indexes) {
|
||||
// try to set one task executing, then return
|
||||
if (task_table_.Execute(index))
|
||||
return task_table_.Get(index)->task;
|
||||
return task_table_.Get(index);
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
|
@ -67,12 +67,12 @@ void Resource::loader_function() {
|
|||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(load_mutex_);
|
||||
load_cv_.wait(lock, [&] { return load_flag_; });
|
||||
auto task = pick_task_load();
|
||||
if (task) {
|
||||
LoadFile(task);
|
||||
auto task_item = pick_task_load();
|
||||
if (task_item) {
|
||||
LoadFile(task_item->task);
|
||||
if (subscriber_) {
|
||||
// auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task);
|
||||
// subscriber_(std::static_pointer_cast<Event>(event));
|
||||
auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
|
||||
subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -81,18 +81,18 @@ void Resource::loader_function() {
|
|||
void Resource::executor_function() {
|
||||
GetRegisterFunc(RegisterType::START_UP)->Exec();
|
||||
if (subscriber_) {
|
||||
// auto event = std::make_shared<StartUpEvent>(shared_from_this());
|
||||
// subscriber_(std::static_pointer_cast<Event>(event));
|
||||
auto event = std::make_shared<StartUpEvent>(shared_from_this());
|
||||
subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(exec_mutex_);
|
||||
exec_cv_.wait(lock, [&] { return exec_flag_; });
|
||||
auto task = pick_task_execute();
|
||||
if (task) {
|
||||
Process(task);
|
||||
auto task_item = pick_task_execute();
|
||||
if (task_item) {
|
||||
Process(task_item->task);
|
||||
if (subscriber_) {
|
||||
// auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task);
|
||||
// subscriber_(std::static_pointer_cast<Event>(event));
|
||||
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
|
||||
subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,14 +114,14 @@ private:
|
|||
* Pick one task to load;
|
||||
* Order by start time;
|
||||
*/
|
||||
TaskPtr
|
||||
TaskTableItemPtr
|
||||
pick_task_load();
|
||||
|
||||
/*
|
||||
* Pick one task to execute;
|
||||
* Pick by start time and priority;
|
||||
*/
|
||||
TaskPtr
|
||||
TaskTableItemPtr
|
||||
pick_task_execute();
|
||||
|
||||
private:
|
||||
|
|
Loading…
Reference in New Issue