mirror of https://github.com/milvus-io/milvus.git
MS-576 Scheduler refactor
Former-commit-id: dc9317412c5dc72191381d5a634e7e9ff14f98b2pull/191/head
parent
48e296ca8c
commit
cfc42c5471
|
@ -147,6 +147,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-539 - Remove old task code
|
||||
- MS-546 - Add simple mode resource_config
|
||||
- MS-570 - Add prometheus docker-compose file
|
||||
- MS-576 - Scheduler refactor
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
// under the License.
|
||||
|
||||
|
||||
#include <src/cache/GpuCacheMgr.h>
|
||||
#include "src/cache/GpuCacheMgr.h"
|
||||
#include "event/LoadCompletedEvent.h"
|
||||
#include "Scheduler.h"
|
||||
#include "action/Action.h"
|
||||
|
@ -33,6 +33,14 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
|
|||
if (auto mgr = res_mgr_.lock()) {
|
||||
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
|
||||
}
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::START_UP),
|
||||
std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1)));
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::LOAD_COMPLETED),
|
||||
std::bind(&Scheduler::OnLoadCompleted, this, std::placeholders::_1)));
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::TASK_TABLE_UPDATED),
|
||||
std::bind(&Scheduler::OnTaskTableUpdated, this, std::placeholders::_1)));
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::FINISH_TASK),
|
||||
std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1)));
|
||||
}
|
||||
|
||||
|
||||
|
@ -84,40 +92,8 @@ Scheduler::worker_function() {
|
|||
|
||||
void
|
||||
Scheduler::Process(const EventPtr &event) {
|
||||
switch (event->Type()) {
|
||||
case EventType::START_UP: {
|
||||
OnStartUp(event);
|
||||
break;
|
||||
}
|
||||
case EventType::LOAD_COMPLETED: {
|
||||
OnLoadCompleted(event);
|
||||
break;
|
||||
}
|
||||
case EventType::FINISH_TASK: {
|
||||
OnFinishTask(event);
|
||||
break;
|
||||
}
|
||||
case EventType::TASK_TABLE_UPDATED: {
|
||||
OnTaskTableUpdated(event);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
// TODO: logging
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Scheduler::OnStartUp(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnFinishTask(const EventPtr &event) {
|
||||
auto process_event = event_register_.at(static_cast<int>(event->Type()));
|
||||
process_event(event);
|
||||
}
|
||||
|
||||
// TODO: refactor the function
|
||||
|
@ -130,79 +106,11 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
|
|||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
|
||||
auto task = load_completed_event->task_table_item_->task;
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
bool moved = false;
|
||||
|
||||
// to support test task, REFACTOR
|
||||
if (auto index_engine = search_task->index_engine_) {
|
||||
auto location = index_engine->GetLocation();
|
||||
|
||||
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
|
||||
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
|
||||
if (index != nullptr) {
|
||||
moved = true;
|
||||
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
|
||||
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not moved) {
|
||||
Action::PushTaskToNeighbourRandomly(task, resource);
|
||||
}
|
||||
}
|
||||
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
// support next version
|
||||
// auto self = event->resource_.lock();
|
||||
// auto task = load_completed_event->task_table_item_->task;
|
||||
//
|
||||
// // if this resource is disk, assign it to smallest cost resource
|
||||
// if (self->type() == ResourceType::DISK) {
|
||||
// // step 1: calculate shortest path per resource, from disk to compute resource
|
||||
// auto compute_resources = res_mgr_.lock()->GetComputeResources();
|
||||
// std::vector<std::vector<std::string>> paths;
|
||||
// std::vector<uint64_t> transport_costs;
|
||||
// for (auto &res : compute_resources) {
|
||||
// std::vector<std::string> path;
|
||||
// uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
|
||||
// transport_costs.push_back(transport_cost);
|
||||
// paths.emplace_back(path);
|
||||
// }
|
||||
//
|
||||
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
// uint64_t min_cost_idx = 0;
|
||||
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
// if (compute_resources[i]->TotalTasks() == 0) {
|
||||
// min_cost_idx = i;
|
||||
// break;
|
||||
// }
|
||||
// uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
|
||||
// + transport_costs[i];
|
||||
// if (min_cost > cost) {
|
||||
// min_cost = cost;
|
||||
// min_cost_idx = i;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // step 3: set path in task
|
||||
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
// task->path() = task_path;
|
||||
// }
|
||||
//
|
||||
// if (self->name() == task->path().Last()) {
|
||||
// self->WakeupLoader();
|
||||
// } else {
|
||||
// auto next_res_name = task->path().Next();
|
||||
// auto next_res = res_mgr_.lock()->GetResource(next_res_name);
|
||||
// load_completed_event->task_table_item_->Move();
|
||||
// next_res->task_table().Put(task);
|
||||
// }
|
||||
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
|
@ -216,6 +124,17 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
|
|||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnStartUp(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnFinishTask(const EventPtr &event) {
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
|
|
|
@ -122,6 +122,8 @@ private:
|
|||
private:
|
||||
bool running_;
|
||||
|
||||
std::unordered_map<uint64_t, std::function<void(EventPtr)>> event_register_;
|
||||
|
||||
ResourceMgrWPtr res_mgr_;
|
||||
std::queue<EventPtr> event_queue_;
|
||||
std::thread worker_thread_;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "../resource/Resource.h"
|
||||
#include "../ResourceMgr.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
|
@ -34,6 +35,15 @@ public:
|
|||
|
||||
static void
|
||||
PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest);
|
||||
|
||||
static void
|
||||
DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event);
|
||||
|
||||
static void
|
||||
SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr,
|
||||
ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event);
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
#include <list>
|
||||
#include <random>
|
||||
#include "../Algorithm.h"
|
||||
#include "src/cache/GpuCacheMgr.h"
|
||||
#include "Action.h"
|
||||
|
||||
|
||||
|
@ -101,6 +103,84 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
|
|||
dest->task_table().Put(task);
|
||||
}
|
||||
|
||||
void
|
||||
Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr,
|
||||
ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
|
||||
auto task = event->task_table_item_->task;
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
bool moved = false;
|
||||
|
||||
//to support test task, REFACTOR
|
||||
if (auto index_engine = search_task->index_engine_) {
|
||||
auto location = index_engine->GetLocation();
|
||||
|
||||
for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
|
||||
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
|
||||
if (index != nullptr) {
|
||||
moved = true;
|
||||
auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
|
||||
PushTaskToResource(event->task_table_item_->task, dest_resource);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not moved) {
|
||||
PushTaskToNeighbourRandomly(task, resource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr,
|
||||
ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
auto task = event->task_table_item_->task;
|
||||
if (resource->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
auto compute_resources = res_mgr.lock()->GetComputeResources();
|
||||
std::vector<std::vector<std::string>> paths;
|
||||
std::vector<uint64_t> transport_costs;
|
||||
for (auto &res : compute_resources) {
|
||||
std::vector<std::string> path;
|
||||
uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
}
|
||||
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
|
||||
+ transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
}
|
||||
|
||||
if (resource->name() == task->path().Last()) {
|
||||
resource->WakeupLoader();
|
||||
} else {
|
||||
auto next_res_name = task->path().Next();
|
||||
auto next_res = res_mgr.lock()->GetResource(next_res_name);
|
||||
event->task_table_item_->Move();
|
||||
next_res->task_table().Put(task);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ constexpr int64_t BATCH_ROW_COUNT = 100000;
|
|||
constexpr int64_t NQ = 5;
|
||||
constexpr int64_t TOP_K = 10;
|
||||
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
|
||||
constexpr int64_t ADD_VECTOR_LOOP = 10;
|
||||
constexpr int64_t ADD_VECTOR_LOOP = 1;
|
||||
constexpr int64_t SECONDS_EACH_HOUR = 3600;
|
||||
|
||||
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
|
||||
|
|
Loading…
Reference in New Issue