From df382a16443b21d4a6b7bdda980031997dcafb4d Mon Sep 17 00:00:00 2001 From: wxyu Date: Fri, 16 Aug 2019 15:03:10 +0800 Subject: [PATCH] MS-361 Add event in resource Former-commit-id: d5f18c13d7111a582ad0bc839aa49879c6bb7d32 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/ResourceMgr.cpp | 56 +++----- cpp/src/scheduler/ResourceMgr.h | 59 +++------ cpp/src/scheduler/Scheduler.cpp | 16 +-- cpp/src/scheduler/Scheduler.h | 123 ++++++------------ cpp/src/scheduler/event/CopyCompletedEvent.h | 27 ++++ cpp/src/scheduler/event/Event.h | 42 ++++++ cpp/src/scheduler/event/FinishTaskEvent.h | 27 ++++ cpp/src/scheduler/event/StartUpEvent.h | 24 ++++ .../scheduler/event/TaskTableUpdatedEvent.h | 25 ++++ cpp/src/scheduler/resource/Resource.cpp | 14 +- cpp/src/scheduler/resource/Resource.h | 25 +++- 12 files changed, 261 insertions(+), 178 deletions(-) create mode 100644 cpp/src/scheduler/event/CopyCompletedEvent.h create mode 100644 cpp/src/scheduler/event/Event.h create mode 100644 cpp/src/scheduler/event/FinishTaskEvent.h create mode 100644 cpp/src/scheduler/event/StartUpEvent.h create mode 100644 cpp/src/scheduler/event/TaskTableUpdatedEvent.h diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index e99c3ddb22..56d7c8e12b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -18,6 +18,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-355 - Add copy interface in ExcutionEngine - MS-357 - Add minimum schedule function - MS-359 - Add cost test in new scheduler +- MS-361 - Add event in resource ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index a75c7a3be6..a0625c836f 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -7,6 +7,7 @@ #include "ResourceMgr.h" #include "db/Log.h" + namespace zilliz { namespace milvus { namespace engine { @@ -21,31 +22,22 @@ ResourceMgr::Add(ResourcePtr &&resource) { ResourceWPtr ret(resource); std::lock_guard lck(resources_mutex_); - if(running_) { + if (running_) { ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource"; return ret; } + if (resource->Type() == ResourceType::DISK) { + disk_resources_.emplace_back(ResourceWPtr(resource)); + } resources_.emplace_back(resource); size_t index = resources_.size() - 1; - // TODO: update interface -// resource->RegisterOnStartUp([&] { -// start_up_event_[index] = true; -// event_cv_.notify_one(); -// }); -// resource->RegisterOnFinishTask([&] { -// finish_task_event_[index] = true; -// event_cv_.notify_one(); -// }); -// resource->RegisterOnCopyCompleted([&] { -// copy_completed_event_[index] = true; -// event_cv_.notify_one(); -// }); -// resource->RegisterOnTaskTableUpdated([&] { -// task_table_updated_event_[index] = true; -// event_cv_.notify_one(); -// }); + resource->RegisterSubscriber([&](EventPtr event) { + queue_.emplace(event); + std::unique_lock lock(event_mutex_); + event_cv_.notify_one(); + }); return ret; } @@ -61,31 +53,17 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect void ResourceMgr::EventProcess() { while (running_) { - std::unique_lock lock(resources_mutex_); - event_cv_.wait(lock, [this] { return !resources_.empty(); }); + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !queue_.empty(); }); - if(!running_) { + if (!running_) { break; } - for (uint64_t i = 0; i < resources_.size(); ++i) { - ResourceWPtr res(resources_[i]); - if (start_up_event_[i]) { - on_start_up_(res); - start_up_event_[i] = false; - } - if (finish_task_event_[i]) { - on_finish_task_(res); - finish_task_event_[i] = false; - } - if (copy_completed_event_[i]) { - on_copy_completed_(res); - copy_completed_event_[i] = false; - } - if (task_table_updated_event_[i]) { - on_task_table_updated_(res); - task_table_updated_event_[i] = false; - } + auto event = queue_.front(); + queue_.pop(); + if (subscriber_) { + subscriber_(event); } } } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index e7a7650695..714924c9b9 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -10,10 +10,12 @@ #include #include #include +#include #include #include "resource/Resource.h" + namespace zilliz { namespace milvus { namespace engine { @@ -23,6 +25,15 @@ public: ResourceMgr(); /******** Management Interface ********/ + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + + std::vector & + GetDiskResources() { + return disk_resources_; + } /* * Add resource into Resource Management; @@ -51,41 +62,6 @@ public: // TODO: add stats interface(low) -public: - /******** Event Register Interface ********/ - - /* - * Register on start up event; - */ - void - RegisterOnStartUp(std::function &func) { - on_start_up_ = func; - } - - /* - * Register on finish one task event; - */ - void - RegisterOnFinishTask(std::function &func) { - on_finish_task_ = func; - } - - /* - * Register on copy task data completed event; - */ - void - RegisterOnCopyCompleted(std::function &func) { - on_copy_completed_ = func; - } - - /* - * Register on task table updated event; - */ - void - RegisterOnTaskTableUpdated(std::function &func) { - on_task_table_updated_ = func; - } - public: /******** Utlitity Functions ********/ @@ -97,22 +73,19 @@ private: EventProcess(); private: + std::queue queue_; + std::function subscriber_ = nullptr; + bool running_; + std::vector disk_resources_; std::vector resources_; mutable std::mutex resources_mutex_; std::thread worker_thread_; + std::mutex event_mutex_; std::condition_variable event_cv_; - std::vector start_up_event_; - std::vector finish_task_event_; - std::vector copy_completed_event_; - std::vector task_table_updated_event_; - std::function on_start_up_; - std::function on_finish_task_; - std::function on_copy_completed_; - std::function on_task_table_updated_; }; using ResourceMgrWPtr = std::weak_ptr; diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index d4ba1ac4a6..fbf66628b9 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -45,23 +45,23 @@ schedule(const ResourceWPtr &res) { } void -StartUpEvent::Process() { - schedule(resource_); +Scheduler::OnStartUp(const EventPtr &event) { + schedule(event->resource_); } void -FinishTaskEvent::Process() { - schedule(resource_); +Scheduler::OnFinishTask(const EventPtr &event) { + schedule(event->resource_); } void -CopyCompletedEvent::Process() { - schedule(resource_); +Scheduler::OnCopyCompleted(const EventPtr &event) { + schedule(event->resource_); } void -TaskTableUpdatedEvent::Process() { - schedule(resource_); +Scheduler::OnTaskTableUpdated(const EventPtr &event) { + schedule(event->resource_); } std::string diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 4f7c714c86..66088dd5b6 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -18,60 +18,6 @@ namespace zilliz { namespace milvus { namespace engine { -class Event { -public: - explicit - Event(ResourceWPtr &resource) : resource_(resource) {} - -public: - virtual void - Process() = 0; - -protected: - ResourceWPtr resource_; -}; - -using EventPtr = std::shared_ptr; - -class StartUpEvent : public Event { -public: - explicit - StartUpEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class FinishTaskEvent : public Event { -public: - explicit - FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class CopyCompletedEvent : public Event { -public: - explicit - CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class TaskTableUpdatedEvent : public Event { -public: - explicit - TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; class Scheduler { public: @@ -90,52 +36,65 @@ public: worker_thread_ = std::thread(&Scheduler::worker_thread_, this); } -public: + std::string + Dump(); + +private: /******** Events ********/ /* * Process start up events; */ - inline void - OnStartUp(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnStartUp(const EventPtr &event); /* * Process finish task events; */ - inline void - OnFinishTask(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnFinishTask(const EventPtr &event); /* * Process copy completed events; */ - inline void - OnCopyCompleted(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnCopyCompleted(const EventPtr &event); /* * Process task table updated events; */ - inline void - OnTaskTableUpdated(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } - - -public: - std::string - Dump(); - + void + OnTaskTableUpdated(const EventPtr &event); private: + /* + * Dispatch event to event handler; + */ + void + Process(const EventPtr &event) { + switch (event->Type()) { + case EventType::START_UP: { + OnStartUp(event); + break; + } + case EventType::COPY_COMPLETED: { + OnCopyCompleted(event); + break; + } + case EventType::FINISH_TASK: { + OnFinishTask(event); + break; + } + case EventType::TASK_TABLE_UPDATED: { + OnTaskTableUpdated(event); + break; + } + default: { + break; + } + } + } + /* * Called by worker_thread_; */ @@ -143,7 +102,7 @@ private: worker_function() { while (running_) { auto event = event_queue_.front(); - event->Process(); + Process(event); } } diff --git a/cpp/src/scheduler/event/CopyCompletedEvent.h b/cpp/src/scheduler/event/CopyCompletedEvent.h new file mode 100644 index 0000000000..8db63490eb --- /dev/null +++ b/cpp/src/scheduler/event/CopyCompletedEvent.h @@ -0,0 +1,27 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" +#include "../TaskTable.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class CopyCompletedEvent : public Event { +public: + CopyCompletedEvent(std::weak_ptr resource, TaskTableItem &task_table_item) + : Event(EventType::COPY_COMPLETED, std::move(resource)), + task_table_item_(task_table_item) {} +public: + TaskTableItem &task_table_item_; +}; + +} +} +} diff --git a/cpp/src/scheduler/event/Event.h b/cpp/src/scheduler/event/Event.h new file mode 100644 index 0000000000..4b04d5404b --- /dev/null +++ b/cpp/src/scheduler/event/Event.h @@ -0,0 +1,42 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class EventType { + START_UP, + COPY_COMPLETED, + FINISH_TASK, + TASK_TABLE_UPDATED +}; + +class Resource; + +class Event { +public: + explicit + Event(EventType type, std::weak_ptr resource) + : type_(type), + resource_(std::move(resource)) {} + + inline EventType + Type() const { + return type_; + } + +public: + EventType type_; + std::weak_ptr resource_; +}; + +using EventPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/event/FinishTaskEvent.h b/cpp/src/scheduler/event/FinishTaskEvent.h new file mode 100644 index 0000000000..34658719f9 --- /dev/null +++ b/cpp/src/scheduler/event/FinishTaskEvent.h @@ -0,0 +1,27 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class FinishTaskEvent : public Event { +public: + FinishTaskEvent(std::weak_ptr resource, TaskTableItem &task_table_item) + : Event(EventType::FINISH_TASK, std::move(resource)), + task_table_item_(task_table_item) {} + +public: + TaskTableItem &task_table_item_; +}; + +} +} +} diff --git a/cpp/src/scheduler/event/StartUpEvent.h b/cpp/src/scheduler/event/StartUpEvent.h new file mode 100644 index 0000000000..04bc462dcc --- /dev/null +++ b/cpp/src/scheduler/event/StartUpEvent.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class StartUpEvent : public Event { +public: + explicit + StartUpEvent(std::weak_ptr resource) + : Event(EventType::START_UP, std::move(resource)) {} +}; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h new file mode 100644 index 0000000000..8658316222 --- /dev/null +++ b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h @@ -0,0 +1,25 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class TaskTableUpdatedEvent : public Event { +public: + explicit + TaskTableUpdatedEvent(std::weak_ptr resource) + : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {} +}; + + +} +} +} diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 7f59256474..4177e97588 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -70,20 +70,30 @@ void Resource::loader_function() { auto task = pick_task_load(); if (task) { LoadFile(task); - GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec(); + if (subscriber_) { +// auto event = std::make_shared(shared_from_this(), task); +// subscriber_(std::static_pointer_cast(event)); + } } } } void Resource::executor_function() { GetRegisterFunc(RegisterType::START_UP)->Exec(); + if (subscriber_) { +// auto event = std::make_shared(shared_from_this()); +// subscriber_(std::static_pointer_cast(event)); + } while (running_) { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); auto task = pick_task_execute(); if (task) { Process(task); - GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec(); + if (subscriber_) { +// auto event = std::make_shared(shared_from_this(), task); +// subscriber_(std::static_pointer_cast(event)); + } } } } diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 2961e281fa..b33f17c4e0 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -12,6 +12,10 @@ #include #include +#include "../event/Event.h" +#include "../event/StartUpEvent.h" +#include "../event/CopyCompletedEvent.h" +#include "../event/FinishTaskEvent.h" #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" @@ -37,18 +41,28 @@ enum class RegisterType { ON_TASK_TABLE_UPDATED, }; -class Resource : public Node { +class Resource : public Node, public std::enable_shared_from_this { public: /* * Event function MUST be a short function, never blocking; */ - template - void Register_T(const RegisterType& type) { + template + void Register_T(const RegisterType &type) { register_table_.emplace(type, [] { return std::make_shared(); }); } RegisterHandlerPtr - GetRegisterFunc(const RegisterType& type); + GetRegisterFunc(const RegisterType &type); + + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + + inline ResourceType + Type() const { + return type_; + } void Start(); @@ -131,8 +145,11 @@ private: TaskTable task_table_; std::map> register_table_; + std::function subscriber_ = nullptr; bool running_; + bool loader_running_ = false; + bool executor_running_ = false; std::thread loader_thread_; std::thread executor_thread_;