MS-376 Add loader and executor enable flag in Resource avoid diskresource execute task

Former-commit-id: 489f60f97444c963a3dbfd913672f79b8a9a330e
pull/191/head
wxyu 2019-08-18 20:04:54 +08:00
parent de6d68238f
commit 4ed498893d
5 changed files with 30 additions and 22 deletions

View File

@ -27,6 +27,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-373 - Add resource test - MS-373 - Add resource test
- MS-374 - Add action definition - MS-374 - Add action definition
- MS-375 - Add Dump implementation for Event - MS-375 - Add Dump implementation for Event
- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr

View File

@ -17,17 +17,6 @@ CpuResource::CpuResource(std::string name)
void CpuResource::LoadFile(TaskPtr task) { void CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0); task->Load(LoadType::DISK2CPU, 0);
//if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
//} else if (src.type == CPU) {
// memcpy(src, dest, len);
//} else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
//} else {
// // unknown type, exception
//}
} }
void CpuResource::Process(TaskPtr task) { void CpuResource::Process(TaskPtr task) {

View File

@ -12,7 +12,8 @@ namespace engine {
DiskResource::DiskResource(std::string name) DiskResource::DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK) {} : Resource(std::move(name), ResourceType::DISK, true, false) {
}
void DiskResource::LoadFile(TaskPtr task) { void DiskResource::LoadFile(TaskPtr task) {

View File

@ -10,10 +10,15 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
Resource::Resource(std::string name, ResourceType type) Resource::Resource(std::string name,
ResourceType type,
bool enable_loader,
bool enable_executor)
: name_(std::move(name)), : name_(std::move(name)),
type_(type), type_(type),
running_(false), running_(false),
enable_loader_(enable_loader),
enable_executor_(enable_executor),
load_flag_(false), load_flag_(false),
exec_flag_(false) { exec_flag_(false) {
task_table_.RegisterSubscriber([&] { task_table_.RegisterSubscriber([&] {
@ -26,16 +31,24 @@ Resource::Resource(std::string name, ResourceType type)
void Resource::Start() { void Resource::Start() {
running_ = true; running_ = true;
loader_thread_ = std::thread(&Resource::loader_function, this); if (enable_loader_) {
executor_thread_ = std::thread(&Resource::executor_function, this); loader_thread_ = std::thread(&Resource::loader_function, this);
}
if (enable_executor_) {
executor_thread_ = std::thread(&Resource::executor_function, this);
}
} }
void Resource::Stop() { void Resource::Stop() {
running_ = false; running_ = false;
WakeupLoader(); if (enable_loader_) {
WakeupExecutor(); WakeupLoader();
loader_thread_.join(); loader_thread_.join();
executor_thread_.join(); }
if (enable_executor_) {
WakeupExecutor();
executor_thread_.join();
}
} }
TaskTable &Resource::task_table() { TaskTable &Resource::task_table() {
@ -106,6 +119,7 @@ void Resource::executor_function() {
auto task_item = pick_task_execute(); auto task_item = pick_task_execute();
if (task_item) { if (task_item) {
Process(task_item->task); Process(task_item->task);
task_item->state = TaskTableItemState::EXECUTED;
if (subscriber_) { if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item); auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));

View File

@ -88,7 +88,10 @@ public:
WakeupExecutor(); WakeupExecutor();
protected: protected:
Resource(std::string name, ResourceType type); Resource(std::string name,
ResourceType type,
bool enable_loader = true,
bool enable_executor = true);
// TODO: SearchContextPtr to TaskPtr // TODO: SearchContextPtr to TaskPtr
/* /*
@ -148,8 +151,8 @@ private:
std::function<void(EventPtr)> subscriber_ = nullptr; std::function<void(EventPtr)> subscriber_ = nullptr;
bool running_; bool running_;
bool loader_running_ = false; bool enable_loader_ = true;
bool executor_running_ = false; bool enable_executor_ = true;
std::thread loader_thread_; std::thread loader_thread_;
std::thread executor_thread_; std::thread executor_thread_;