mirror of https://github.com/milvus-io/milvus.git
MS-361 Add event in resource
Former-commit-id: d5f18c13d7111a582ad0bc839aa49879c6bb7d32pull/191/head
parent
c3469fa561
commit
df382a1644
|
@ -18,6 +18,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-355 - Add copy interface in ExcutionEngine
|
||||
- MS-357 - Add minimum schedule function
|
||||
- MS-359 - Add cost test in new scheduler
|
||||
- MS-361 - Add event in resource
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include "ResourceMgr.h"
|
||||
#include "db/Log.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
@ -21,31 +22,22 @@ ResourceMgr::Add(ResourcePtr &&resource) {
|
|||
ResourceWPtr ret(resource);
|
||||
|
||||
std::lock_guard<std::mutex> lck(resources_mutex_);
|
||||
if(running_) {
|
||||
if (running_) {
|
||||
ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (resource->Type() == ResourceType::DISK) {
|
||||
disk_resources_.emplace_back(ResourceWPtr(resource));
|
||||
}
|
||||
resources_.emplace_back(resource);
|
||||
|
||||
size_t index = resources_.size() - 1;
|
||||
// TODO: update interface
|
||||
// resource->RegisterOnStartUp([&] {
|
||||
// start_up_event_[index] = true;
|
||||
// event_cv_.notify_one();
|
||||
// });
|
||||
// resource->RegisterOnFinishTask([&] {
|
||||
// finish_task_event_[index] = true;
|
||||
// event_cv_.notify_one();
|
||||
// });
|
||||
// resource->RegisterOnCopyCompleted([&] {
|
||||
// copy_completed_event_[index] = true;
|
||||
// event_cv_.notify_one();
|
||||
// });
|
||||
// resource->RegisterOnTaskTableUpdated([&] {
|
||||
// task_table_updated_event_[index] = true;
|
||||
// event_cv_.notify_one();
|
||||
// });
|
||||
resource->RegisterSubscriber([&](EventPtr event) {
|
||||
queue_.emplace(event);
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
event_cv_.notify_one();
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -61,31 +53,17 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect
|
|||
void
|
||||
ResourceMgr::EventProcess() {
|
||||
while (running_) {
|
||||
std::unique_lock <std::mutex> lock(resources_mutex_);
|
||||
event_cv_.wait(lock, [this] { return !resources_.empty(); });
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
event_cv_.wait(lock, [this] { return !queue_.empty(); });
|
||||
|
||||
if(!running_) {
|
||||
if (!running_) {
|
||||
break;
|
||||
}
|
||||
|
||||
for (uint64_t i = 0; i < resources_.size(); ++i) {
|
||||
ResourceWPtr res(resources_[i]);
|
||||
if (start_up_event_[i]) {
|
||||
on_start_up_(res);
|
||||
start_up_event_[i] = false;
|
||||
}
|
||||
if (finish_task_event_[i]) {
|
||||
on_finish_task_(res);
|
||||
finish_task_event_[i] = false;
|
||||
}
|
||||
if (copy_completed_event_[i]) {
|
||||
on_copy_completed_(res);
|
||||
copy_completed_event_[i] = false;
|
||||
}
|
||||
if (task_table_updated_event_[i]) {
|
||||
on_task_table_updated_(res);
|
||||
task_table_updated_event_[i] = false;
|
||||
}
|
||||
auto event = queue_.front();
|
||||
queue_.pop();
|
||||
if (subscriber_) {
|
||||
subscriber_(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,10 +10,12 @@
|
|||
#include <vector>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <condition_variable>
|
||||
|
||||
#include "resource/Resource.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
@ -23,6 +25,15 @@ public:
|
|||
ResourceMgr();
|
||||
|
||||
/******** Management Interface ********/
|
||||
inline void
|
||||
RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
|
||||
subscriber_ = std::move(subscriber);
|
||||
}
|
||||
|
||||
std::vector<ResourceWPtr> &
|
||||
GetDiskResources() {
|
||||
return disk_resources_;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add resource into Resource Management;
|
||||
|
@ -51,41 +62,6 @@ public:
|
|||
|
||||
// TODO: add stats interface(low)
|
||||
|
||||
public:
|
||||
/******** Event Register Interface ********/
|
||||
|
||||
/*
|
||||
* Register on start up event;
|
||||
*/
|
||||
void
|
||||
RegisterOnStartUp(std::function<void(ResourceWPtr)> &func) {
|
||||
on_start_up_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on finish one task event;
|
||||
*/
|
||||
void
|
||||
RegisterOnFinishTask(std::function<void(ResourceWPtr)> &func) {
|
||||
on_finish_task_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on copy task data completed event;
|
||||
*/
|
||||
void
|
||||
RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func) {
|
||||
on_copy_completed_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on task table updated event;
|
||||
*/
|
||||
void
|
||||
RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func) {
|
||||
on_task_table_updated_ = func;
|
||||
}
|
||||
|
||||
public:
|
||||
/******** Utlitity Functions ********/
|
||||
|
||||
|
@ -97,22 +73,19 @@ private:
|
|||
EventProcess();
|
||||
|
||||
private:
|
||||
std::queue<EventPtr> queue_;
|
||||
std::function<void(EventPtr)> subscriber_ = nullptr;
|
||||
|
||||
bool running_;
|
||||
|
||||
std::vector<ResourceWPtr> disk_resources_;
|
||||
std::vector<ResourcePtr> resources_;
|
||||
mutable std::mutex resources_mutex_;
|
||||
std::thread worker_thread_;
|
||||
|
||||
std::mutex event_mutex_;
|
||||
std::condition_variable event_cv_;
|
||||
std::vector<bool> start_up_event_;
|
||||
std::vector<bool> finish_task_event_;
|
||||
std::vector<bool> copy_completed_event_;
|
||||
std::vector<bool> task_table_updated_event_;
|
||||
|
||||
std::function<void(ResourceWPtr)> on_start_up_;
|
||||
std::function<void(ResourceWPtr)> on_finish_task_;
|
||||
std::function<void(ResourceWPtr)> on_copy_completed_;
|
||||
std::function<void(ResourceWPtr)> on_task_table_updated_;
|
||||
};
|
||||
|
||||
using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>;
|
||||
|
|
|
@ -45,23 +45,23 @@ schedule(const ResourceWPtr &res) {
|
|||
}
|
||||
|
||||
void
|
||||
StartUpEvent::Process() {
|
||||
schedule(resource_);
|
||||
Scheduler::OnStartUp(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
}
|
||||
|
||||
void
|
||||
FinishTaskEvent::Process() {
|
||||
schedule(resource_);
|
||||
Scheduler::OnFinishTask(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
}
|
||||
|
||||
void
|
||||
CopyCompletedEvent::Process() {
|
||||
schedule(resource_);
|
||||
Scheduler::OnCopyCompleted(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
}
|
||||
|
||||
void
|
||||
TaskTableUpdatedEvent::Process() {
|
||||
schedule(resource_);
|
||||
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
}
|
||||
|
||||
std::string
|
||||
|
|
|
@ -18,60 +18,6 @@ namespace zilliz {
|
|||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class Event {
|
||||
public:
|
||||
explicit
|
||||
Event(ResourceWPtr &resource) : resource_(resource) {}
|
||||
|
||||
public:
|
||||
virtual void
|
||||
Process() = 0;
|
||||
|
||||
protected:
|
||||
ResourceWPtr resource_;
|
||||
};
|
||||
|
||||
using EventPtr = std::shared_ptr<Event>;
|
||||
|
||||
class StartUpEvent : public Event {
|
||||
public:
|
||||
explicit
|
||||
StartUpEvent(ResourceWPtr &resource) : Event(resource) {}
|
||||
|
||||
public:
|
||||
void
|
||||
Process() override;
|
||||
};
|
||||
|
||||
class FinishTaskEvent : public Event {
|
||||
public:
|
||||
explicit
|
||||
FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {}
|
||||
|
||||
public:
|
||||
void
|
||||
Process() override;
|
||||
};
|
||||
|
||||
class CopyCompletedEvent : public Event {
|
||||
public:
|
||||
explicit
|
||||
CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {}
|
||||
|
||||
public:
|
||||
void
|
||||
Process() override;
|
||||
};
|
||||
|
||||
class TaskTableUpdatedEvent : public Event {
|
||||
public:
|
||||
explicit
|
||||
TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {}
|
||||
|
||||
public:
|
||||
void
|
||||
Process() override;
|
||||
};
|
||||
|
||||
class Scheduler {
|
||||
public:
|
||||
|
@ -90,52 +36,65 @@ public:
|
|||
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
|
||||
}
|
||||
|
||||
public:
|
||||
std::string
|
||||
Dump();
|
||||
|
||||
private:
|
||||
/******** Events ********/
|
||||
|
||||
/*
|
||||
* Process start up events;
|
||||
*/
|
||||
inline void
|
||||
OnStartUp(ResourceWPtr &resource) {
|
||||
auto event = std::make_shared<StartUpEvent>(resource);
|
||||
event_queue_.push(event);
|
||||
}
|
||||
void
|
||||
OnStartUp(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Process finish task events;
|
||||
*/
|
||||
inline void
|
||||
OnFinishTask(ResourceWPtr &resource) {
|
||||
auto event = std::make_shared<FinishTaskEvent>(resource);
|
||||
event_queue_.push(event);
|
||||
}
|
||||
void
|
||||
OnFinishTask(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Process copy completed events;
|
||||
*/
|
||||
inline void
|
||||
OnCopyCompleted(ResourceWPtr &resource) {
|
||||
auto event = std::make_shared<CopyCompletedEvent>(resource);
|
||||
event_queue_.push(event);
|
||||
}
|
||||
void
|
||||
OnCopyCompleted(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Process task table updated events;
|
||||
*/
|
||||
inline void
|
||||
OnTaskTableUpdated(ResourceWPtr &resource) {
|
||||
auto event = std::make_shared<TaskTableUpdatedEvent>(resource);
|
||||
event_queue_.push(event);
|
||||
}
|
||||
|
||||
|
||||
public:
|
||||
std::string
|
||||
Dump();
|
||||
|
||||
void
|
||||
OnTaskTableUpdated(const EventPtr &event);
|
||||
|
||||
private:
|
||||
/*
|
||||
* Dispatch event to event handler;
|
||||
*/
|
||||
void
|
||||
Process(const EventPtr &event) {
|
||||
switch (event->Type()) {
|
||||
case EventType::START_UP: {
|
||||
OnStartUp(event);
|
||||
break;
|
||||
}
|
||||
case EventType::COPY_COMPLETED: {
|
||||
OnCopyCompleted(event);
|
||||
break;
|
||||
}
|
||||
case EventType::FINISH_TASK: {
|
||||
OnFinishTask(event);
|
||||
break;
|
||||
}
|
||||
case EventType::TASK_TABLE_UPDATED: {
|
||||
OnTaskTableUpdated(event);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Called by worker_thread_;
|
||||
*/
|
||||
|
@ -143,7 +102,7 @@ private:
|
|||
worker_function() {
|
||||
while (running_) {
|
||||
auto event = event_queue_.front();
|
||||
event->Process();
|
||||
Process(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*******************************************************************************
|
||||
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
|
||||
* unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "Event.h"
|
||||
#include "../TaskTable.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class CopyCompletedEvent : public Event {
|
||||
public:
|
||||
CopyCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItem &task_table_item)
|
||||
: Event(EventType::COPY_COMPLETED, std::move(resource)),
|
||||
task_table_item_(task_table_item) {}
|
||||
public:
|
||||
TaskTableItem &task_table_item_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*******************************************************************************
|
||||
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
|
||||
* unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
enum class EventType {
|
||||
START_UP,
|
||||
COPY_COMPLETED,
|
||||
FINISH_TASK,
|
||||
TASK_TABLE_UPDATED
|
||||
};
|
||||
|
||||
class Resource;
|
||||
|
||||
class Event {
|
||||
public:
|
||||
explicit
|
||||
Event(EventType type, std::weak_ptr<Resource> resource)
|
||||
: type_(type),
|
||||
resource_(std::move(resource)) {}
|
||||
|
||||
inline EventType
|
||||
Type() const {
|
||||
return type_;
|
||||
}
|
||||
|
||||
public:
|
||||
EventType type_;
|
||||
std::weak_ptr<Resource> resource_;
|
||||
};
|
||||
|
||||
using EventPtr = std::shared_ptr<Event>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*******************************************************************************
|
||||
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
|
||||
* unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "Event.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class FinishTaskEvent : public Event {
|
||||
public:
|
||||
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItem &task_table_item)
|
||||
: Event(EventType::FINISH_TASK, std::move(resource)),
|
||||
task_table_item_(task_table_item) {}
|
||||
|
||||
public:
|
||||
TaskTableItem &task_table_item_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/*******************************************************************************
|
||||
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
|
||||
* unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "Event.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class StartUpEvent : public Event {
|
||||
public:
|
||||
explicit
|
||||
StartUpEvent(std::weak_ptr<Resource> resource)
|
||||
: Event(EventType::START_UP, std::move(resource)) {}
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*******************************************************************************
|
||||
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
|
||||
* unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "Event.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class TaskTableUpdatedEvent : public Event {
|
||||
public:
|
||||
explicit
|
||||
TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
|
||||
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {}
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -70,20 +70,30 @@ void Resource::loader_function() {
|
|||
auto task = pick_task_load();
|
||||
if (task) {
|
||||
LoadFile(task);
|
||||
GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec();
|
||||
if (subscriber_) {
|
||||
// auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task);
|
||||
// subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Resource::executor_function() {
|
||||
GetRegisterFunc(RegisterType::START_UP)->Exec();
|
||||
if (subscriber_) {
|
||||
// auto event = std::make_shared<StartUpEvent>(shared_from_this());
|
||||
// subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(exec_mutex_);
|
||||
exec_cv_.wait(lock, [&] { return exec_flag_; });
|
||||
auto task = pick_task_execute();
|
||||
if (task) {
|
||||
Process(task);
|
||||
GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec();
|
||||
if (subscriber_) {
|
||||
// auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task);
|
||||
// subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,10 @@
|
|||
#include <functional>
|
||||
#include <condition_variable>
|
||||
|
||||
#include "../event/Event.h"
|
||||
#include "../event/StartUpEvent.h"
|
||||
#include "../event/CopyCompletedEvent.h"
|
||||
#include "../event/FinishTaskEvent.h"
|
||||
#include "../TaskTable.h"
|
||||
#include "../task/Task.h"
|
||||
#include "../Cost.h"
|
||||
|
@ -37,18 +41,28 @@ enum class RegisterType {
|
|||
ON_TASK_TABLE_UPDATED,
|
||||
};
|
||||
|
||||
class Resource : public Node {
|
||||
class Resource : public Node, public std::enable_shared_from_this<Resource> {
|
||||
public:
|
||||
/*
|
||||
* Event function MUST be a short function, never blocking;
|
||||
*/
|
||||
template <typename T>
|
||||
void Register_T(const RegisterType& type) {
|
||||
template<typename T>
|
||||
void Register_T(const RegisterType &type) {
|
||||
register_table_.emplace(type, [] { return std::make_shared<T>(); });
|
||||
}
|
||||
|
||||
RegisterHandlerPtr
|
||||
GetRegisterFunc(const RegisterType& type);
|
||||
GetRegisterFunc(const RegisterType &type);
|
||||
|
||||
inline void
|
||||
RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
|
||||
subscriber_ = std::move(subscriber);
|
||||
}
|
||||
|
||||
inline ResourceType
|
||||
Type() const {
|
||||
return type_;
|
||||
}
|
||||
|
||||
void
|
||||
Start();
|
||||
|
@ -131,8 +145,11 @@ private:
|
|||
TaskTable task_table_;
|
||||
|
||||
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
|
||||
std::function<void(EventPtr)> subscriber_ = nullptr;
|
||||
|
||||
bool running_;
|
||||
bool loader_running_ = false;
|
||||
bool executor_running_ = false;
|
||||
std::thread loader_thread_;
|
||||
std::thread executor_thread_;
|
||||
|
||||
|
|
Loading…
Reference in New Issue