mirror of https://github.com/milvus-io/milvus.git
Merge branch 'branch-0.4.0' into 'branch-0.4.0'
MS-380 Update resource loader and executor, work util all finished See merge request megasearch/milvus!387 Former-commit-id: 2c942b172d24ffa494c756b5ce0eca9bad9f0e42pull/191/head
commit
8b3d867a43
|
@ -30,6 +30,8 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task
|
||||
- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable
|
||||
- MS-378 - Debug and Update normal_test in scheduler unittest
|
||||
- MS-379 - Add Dump implementation in Resource
|
||||
- MS-380 - Update resource loader and executor, work util all finished
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include <iostream>
|
||||
#include "Scheduler.h"
|
||||
#include "Cost.h"
|
||||
#include "action/Action.h"
|
||||
|
|
|
@ -12,7 +12,7 @@ namespace milvus {
|
|||
namespace engine {
|
||||
|
||||
void
|
||||
push_task(ResourcePtr &self, ResourcePtr &other) {
|
||||
push_task(const ResourcePtr &self, const ResourcePtr &other) {
|
||||
auto &self_task_table = self->task_table();
|
||||
auto &other_task_table = other->task_table();
|
||||
CacheMgr cache;
|
||||
|
@ -31,8 +31,7 @@ Action::PushTaskToNeighbour(const ResourceWPtr &res) {
|
|||
if (auto self = res.lock()) {
|
||||
for (auto &neighbour : self->GetNeighbours()) {
|
||||
if (auto n = neighbour.neighbour_node.lock()) {
|
||||
auto neighbour = std::static_pointer_cast<Resource>(n);
|
||||
push_task(self, neighbour);
|
||||
push_task(self, std::static_pointer_cast<Resource>(n));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,10 @@ namespace zilliz {
|
|||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
|
||||
out << resource.Dump();
|
||||
return out;
|
||||
}
|
||||
|
||||
CpuResource::CpuResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::CPU) {}
|
||||
|
|
|
@ -19,6 +19,13 @@ public:
|
|||
explicit
|
||||
CpuResource(std::string name);
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<CpuResource>";
|
||||
}
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource);
|
||||
|
||||
protected:
|
||||
void
|
||||
LoadFile(TaskPtr task) override;
|
||||
|
|
|
@ -10,6 +10,10 @@ namespace zilliz {
|
|||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
|
||||
out << resource.Dump();
|
||||
return out;
|
||||
}
|
||||
|
||||
DiskResource::DiskResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::DISK, true, false) {
|
||||
|
|
|
@ -18,6 +18,13 @@ public:
|
|||
explicit
|
||||
DiskResource(std::string name);
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<DiskResource>";
|
||||
}
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource);
|
||||
|
||||
protected:
|
||||
void
|
||||
LoadFile(TaskPtr task) override;
|
||||
|
|
|
@ -11,6 +11,10 @@ namespace zilliz {
|
|||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
|
||||
out << resource.Dump();
|
||||
return out;
|
||||
}
|
||||
|
||||
GpuResource::GpuResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::GPU) {}
|
||||
|
|
|
@ -18,6 +18,13 @@ public:
|
|||
explicit
|
||||
GpuResource(std::string name);
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<GpuResource>";
|
||||
}
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource);
|
||||
|
||||
protected:
|
||||
void
|
||||
LoadFile(TaskPtr task) override;
|
||||
|
|
|
@ -10,6 +10,11 @@ namespace zilliz {
|
|||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
std::ostream &operator<<(std::ostream &out, const Resource &resource) {
|
||||
out << resource.Dump();
|
||||
return out;
|
||||
}
|
||||
|
||||
Resource::Resource(std::string name,
|
||||
ResourceType type,
|
||||
bool enable_loader,
|
||||
|
@ -94,8 +99,11 @@ void Resource::loader_function() {
|
|||
std::unique_lock<std::mutex> lock(load_mutex_);
|
||||
load_cv_.wait(lock, [&] { return load_flag_; });
|
||||
load_flag_ = false;
|
||||
auto task_item = pick_task_load();
|
||||
if (task_item) {
|
||||
while (true) {
|
||||
auto task_item = pick_task_load();
|
||||
if (task_item == nullptr) {
|
||||
break;
|
||||
}
|
||||
LoadFile(task_item->task);
|
||||
// TODO: wrapper loaded
|
||||
task_item->state = TaskTableItemState::LOADED;
|
||||
|
@ -104,6 +112,7 @@ void Resource::loader_function() {
|
|||
subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,8 +125,11 @@ void Resource::executor_function() {
|
|||
std::unique_lock<std::mutex> lock(exec_mutex_);
|
||||
exec_cv_.wait(lock, [&] { return exec_flag_; });
|
||||
exec_flag_ = false;
|
||||
auto task_item = pick_task_execute();
|
||||
if (task_item) {
|
||||
while (true) {
|
||||
auto task_item = pick_task_execute();
|
||||
if (task_item == nullptr) {
|
||||
break;
|
||||
}
|
||||
Process(task_item->task);
|
||||
task_item->state = TaskTableItemState::EXECUTED;
|
||||
if (subscriber_) {
|
||||
|
@ -125,6 +137,7 @@ void Resource::executor_function() {
|
|||
subscriber_(std::static_pointer_cast<Event>(event));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -74,6 +74,13 @@ public:
|
|||
TaskTable &
|
||||
task_table();
|
||||
|
||||
inline virtual std::string
|
||||
Dump() const {
|
||||
return "<Resource>";
|
||||
}
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &out, const Resource &resource);
|
||||
|
||||
public:
|
||||
/*
|
||||
* wake up loader;
|
||||
|
|
|
@ -36,20 +36,47 @@ TEST(normal_test, test1) {
|
|||
observe->task_table().Put(task2);
|
||||
observe->task_table().Put(task3);
|
||||
observe->task_table().Put(task4);
|
||||
std::cout << "disk:" << std::endl;
|
||||
std::cout << observe->task_table().Dump() << std::endl;
|
||||
}
|
||||
|
||||
// if (auto disk_r = disk.lock()) {
|
||||
// if (auto cpu_r = cpu.lock()) {
|
||||
// if (auto gpu1_r = gpu1.lock()) {
|
||||
// if (auto gpu2_r = gpu2.lock()) {
|
||||
// std::cout << "<<<<<<<<<<before<<<<<<<<<<" << std::endl;
|
||||
// std::cout << "disk:" << std::endl;
|
||||
// std::cout << disk_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "cpu:" << std::endl;
|
||||
// std::cout << cpu_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu1:" << std::endl;
|
||||
// std::cout << gpu1_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu2:" << std::endl;
|
||||
// std::cout << gpu2_r->task_table().Dump() << std::endl;
|
||||
// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
sleep(5);
|
||||
|
||||
if (auto observe = disk.lock()) {
|
||||
std::cout << "disk:" << std::endl;
|
||||
std::cout << observe->task_table().Dump() << std::endl;
|
||||
}
|
||||
if (auto observe = cpu.lock()) {
|
||||
std::cout << "cpu:" << std::endl;
|
||||
std::cout << observe->task_table().Dump() << std::endl;
|
||||
}
|
||||
// if (auto disk_r = disk.lock()) {
|
||||
// if (auto cpu_r = cpu.lock()) {
|
||||
// if (auto gpu1_r = gpu1.lock()) {
|
||||
// if (auto gpu2_r = gpu2.lock()) {
|
||||
// std::cout << "<<<<<<<<<<after<<<<<<<<<<" << std::endl;
|
||||
// std::cout << "disk:" << std::endl;
|
||||
// std::cout << disk_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "cpu:" << std::endl;
|
||||
// std::cout << cpu_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu1:" << std::endl;
|
||||
// std::cout << gpu1_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu2:" << std::endl;
|
||||
// std::cout << gpu2_r->task_table().Dump() << std::endl;
|
||||
// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
scheduler->Stop();
|
||||
res_mgr->Stop();
|
||||
|
||||
|
|
|
@ -25,12 +25,20 @@ protected:
|
|||
disk_resource_ = ResourceFactory::Create("disk");
|
||||
cpu_resource_ = ResourceFactory::Create("cpu");
|
||||
gpu_resource_ = ResourceFactory::Create("gpu");
|
||||
flag_ = false;
|
||||
resources_.push_back(disk_resource_);
|
||||
resources_.push_back(cpu_resource_);
|
||||
resources_.push_back(gpu_resource_);
|
||||
|
||||
auto subscriber = [&](EventPtr event) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) {
|
||||
flag_ = true;
|
||||
auto subscriber = [&](EventPtr event) {
|
||||
if (event->Type() == EventType::COPY_COMPLETED) {
|
||||
std::lock_guard<std::mutex> lock(load_mutex_);
|
||||
++load_count_;
|
||||
cv_.notify_one();
|
||||
}
|
||||
|
||||
if (event->Type() == EventType::FINISH_TASK) {
|
||||
std::lock_guard<std::mutex> lock(load_mutex_);
|
||||
++exec_count_;
|
||||
cv_.notify_one();
|
||||
}
|
||||
};
|
||||
|
@ -52,42 +60,82 @@ protected:
|
|||
}
|
||||
|
||||
void
|
||||
Wait() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
cv_.wait(lock, [&] { return flag_; });
|
||||
WaitLoader(uint64_t count) {
|
||||
std::unique_lock<std::mutex> lock(load_mutex_);
|
||||
cv_.wait(lock, [&] { return load_count_ == count; });
|
||||
}
|
||||
|
||||
void
|
||||
WaitExecutor(uint64_t count) {
|
||||
std::unique_lock<std::mutex> lock(exec_mutex_);
|
||||
cv_.wait(lock, [&] { return exec_count_ == count; });
|
||||
}
|
||||
|
||||
ResourcePtr disk_resource_;
|
||||
ResourcePtr cpu_resource_;
|
||||
ResourcePtr gpu_resource_;
|
||||
bool flag_;
|
||||
std::mutex mutex_;
|
||||
std::vector<ResourcePtr> resources_;
|
||||
uint64_t load_count_ = 0;
|
||||
uint64_t exec_count_ = 0;
|
||||
std::mutex load_mutex_;
|
||||
std::mutex exec_mutex_;
|
||||
std::condition_variable cv_;
|
||||
};
|
||||
|
||||
|
||||
TEST_F(ResourceTest, cpu_resource_test) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
cpu_resource_->task_table().Put(task);
|
||||
const uint64_t NUM = 100;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
tasks.push_back(task);
|
||||
cpu_resource_->task_table().Put(task);
|
||||
}
|
||||
|
||||
cpu_resource_->WakeupLoader();
|
||||
Wait();
|
||||
ASSERT_EQ(task->load_count_, 1);
|
||||
flag_ = false;
|
||||
WaitLoader(NUM);
|
||||
// std::cout << "after WakeupLoader" << std::endl;
|
||||
// std::cout << cpu_resource_->task_table().Dump();
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
ASSERT_EQ(tasks[i]->load_count_, 1);
|
||||
}
|
||||
|
||||
cpu_resource_->WakeupExecutor();
|
||||
Wait();
|
||||
ASSERT_EQ(task->exec_count_, 1);
|
||||
WaitExecutor(NUM);
|
||||
// std::cout << "after WakeupExecutor" << std::endl;
|
||||
// std::cout << cpu_resource_->task_table().Dump();
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
ASSERT_EQ(tasks[i]->exec_count_, 1);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ResourceTest, gpu_resource_test) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
gpu_resource_->task_table().Put(task);
|
||||
const uint64_t NUM = 100;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
tasks.push_back(task);
|
||||
gpu_resource_->task_table().Put(task);
|
||||
}
|
||||
|
||||
gpu_resource_->WakeupLoader();
|
||||
Wait();
|
||||
ASSERT_EQ(task->load_count_, 1);
|
||||
flag_ = false;
|
||||
WaitLoader(NUM);
|
||||
// std::cout << "after WakeupLoader" << std::endl;
|
||||
// std::cout << cpu_resource_->task_table().Dump();
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
ASSERT_EQ(tasks[i]->load_count_, 1);
|
||||
}
|
||||
|
||||
gpu_resource_->WakeupExecutor();
|
||||
Wait();
|
||||
ASSERT_EQ(task->exec_count_, 1);
|
||||
WaitExecutor(NUM);
|
||||
// std::cout << "after WakeupExecutor" << std::endl;
|
||||
// std::cout << cpu_resource_->task_table().Dump();
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
ASSERT_EQ(tasks[i]->exec_count_, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue