From 41f3a2ac2bd9015a1e207363d005dfc591253e93 Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 19 Aug 2019 10:47:30 +0800 Subject: [PATCH 1/2] MS-379 Add Dump implementation in Resource Former-commit-id: 0ad824b5d582fc7235a0e29919fdf5a2975bf534 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Scheduler.cpp | 1 - cpp/src/scheduler/resource/CpuResource.cpp | 4 ++ cpp/src/scheduler/resource/CpuResource.h | 7 +++ cpp/src/scheduler/resource/DiskResource.cpp | 4 ++ cpp/src/scheduler/resource/DiskResource.h | 7 +++ cpp/src/scheduler/resource/GpuResource.cpp | 4 ++ cpp/src/scheduler/resource/GpuResource.h | 7 +++ cpp/src/scheduler/resource/Resource.cpp | 5 +++ cpp/src/scheduler/resource/Resource.h | 7 +++ cpp/unittest/scheduler/normal_test.cpp | 47 ++++++++++++++++----- 11 files changed, 83 insertions(+), 11 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 8232bd77c0..4358c5ddb2 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -30,6 +30,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task - MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable - MS-378 - Debug and Update normal_test in scheduler unittest +- MS-379 - Add Dump implementation in Resource ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 06dfa669db..191d1957aa 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -4,7 +4,6 @@ * Proprietary and confidential. ******************************************************************************/ -#include #include "Scheduler.h" #include "Cost.h" #include "action/Action.h" diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index 11c7796187..01fca35ee4 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -11,6 +11,10 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const CpuResource &resource) { + out << resource.Dump(); + return out; +} CpuResource::CpuResource(std::string name) : Resource(std::move(name), ResourceType::CPU) {} diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index be1340e954..c180ea07d7 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -19,6 +19,13 @@ public: explicit CpuResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp index 66cb72b062..8612909e44 100644 --- a/cpp/src/scheduler/resource/DiskResource.cpp +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -10,6 +10,10 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const DiskResource &resource) { + out << resource.Dump(); + return out; +} DiskResource::DiskResource(std::string name) : Resource(std::move(name), ResourceType::DISK, true, false) { diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 39211dbb66..0fb9d625aa 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -18,6 +18,13 @@ public: explicit DiskResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp index df6827881c..8606bb7856 100644 --- a/cpp/src/scheduler/resource/GpuResource.cpp +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -11,6 +11,10 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const GpuResource &resource) { + out << resource.Dump(); + return out; +} GpuResource::GpuResource(std::string name) : Resource(std::move(name), ResourceType::GPU) {} diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 84bf163284..1cb38df34f 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -18,6 +18,13 @@ public: explicit GpuResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index ed5d57b7dd..298d1b7d9f 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -10,6 +10,11 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const Resource &resource) { + out << resource.Dump(); + return out; +} + Resource::Resource(std::string name, ResourceType type, bool enable_loader, diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 769661c67b..c32149b46e 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -74,6 +74,13 @@ public: TaskTable & task_table(); + inline virtual std::string + Dump() const { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const Resource &resource); + public: /* * wake up loader; diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index 1123a3fb7e..4d1fa36de8 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -36,20 +36,47 @@ TEST(normal_test, test1) { observe->task_table().Put(task2); observe->task_table().Put(task3); observe->task_table().Put(task4); - std::cout << "disk:" << std::endl; - std::cout << observe->task_table().Dump() << std::endl; } +// if (auto disk_r = disk.lock()) { +// if (auto cpu_r = cpu.lock()) { +// if (auto gpu1_r = gpu1.lock()) { +// if (auto gpu2_r = gpu2.lock()) { +// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; +// std::cout << "cpu:" << std::endl; +// std::cout << cpu_r->task_table().Dump() << std::endl; +// std::cout << "gpu1:" << std::endl; +// std::cout << gpu1_r->task_table().Dump() << std::endl; +// std::cout << "gpu2:" << std::endl; +// std::cout << gpu2_r->task_table().Dump() << std::endl; +// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl; +// } +// } +// } +// } + sleep(5); - if (auto observe = disk.lock()) { - std::cout << "disk:" << std::endl; - std::cout << observe->task_table().Dump() << std::endl; - } - if (auto observe = cpu.lock()) { - std::cout << "cpu:" << std::endl; - std::cout << observe->task_table().Dump() << std::endl; - } +// if (auto disk_r = disk.lock()) { +// if (auto cpu_r = cpu.lock()) { +// if (auto gpu1_r = gpu1.lock()) { +// if (auto gpu2_r = gpu2.lock()) { +// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; +// std::cout << "cpu:" << std::endl; +// std::cout << cpu_r->task_table().Dump() << std::endl; +// std::cout << "gpu1:" << std::endl; +// std::cout << gpu1_r->task_table().Dump() << std::endl; +// std::cout << "gpu2:" << std::endl; +// std::cout << gpu2_r->task_table().Dump() << std::endl; +// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl; +// } +// } +// } +// } scheduler->Stop(); res_mgr->Stop(); From 842fa507fb4ee7a9c01647ec85f7d79f5194028c Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 19 Aug 2019 11:48:45 +0800 Subject: [PATCH 2/2] MS-380 Update resource loader and executor, work util all finished Former-commit-id: 712a0aceaa4c8d4ebbea40f5d18f524afeb38559 --- cpp/CHANGELOG.md | 1 + .../scheduler/action/PushTaskToNeighbour.cpp | 5 +- cpp/src/scheduler/resource/Resource.cpp | 16 ++- cpp/unittest/scheduler/resource_test.cpp | 98 ++++++++++++++----- 4 files changed, 88 insertions(+), 32 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 4358c5ddb2..e0f3b1de25 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -31,6 +31,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable - MS-378 - Debug and Update normal_test in scheduler unittest - MS-379 - Add Dump implementation in Resource +- MS-380 - Update resource loader and executor, work util all finished ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index c99f490f11..a9b43b3f05 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -12,7 +12,7 @@ namespace milvus { namespace engine { void -push_task(ResourcePtr &self, ResourcePtr &other) { +push_task(const ResourcePtr &self, const ResourcePtr &other) { auto &self_task_table = self->task_table(); auto &other_task_table = other->task_table(); CacheMgr cache; @@ -31,8 +31,7 @@ Action::PushTaskToNeighbour(const ResourceWPtr &res) { if (auto self = res.lock()) { for (auto &neighbour : self->GetNeighbours()) { if (auto n = neighbour.neighbour_node.lock()) { - auto neighbour = std::static_pointer_cast(n); - push_task(self, neighbour); + push_task(self, std::static_pointer_cast(n)); } } } diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 298d1b7d9f..2c46a703c6 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -99,8 +99,11 @@ void Resource::loader_function() { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); load_flag_ = false; - auto task_item = pick_task_load(); - if (task_item) { + while (true) { + auto task_item = pick_task_load(); + if (task_item == nullptr) { + break; + } LoadFile(task_item->task); // TODO: wrapper loaded task_item->state = TaskTableItemState::LOADED; @@ -109,6 +112,7 @@ void Resource::loader_function() { subscriber_(std::static_pointer_cast(event)); } } + } } @@ -121,8 +125,11 @@ void Resource::executor_function() { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_flag_ = false; - auto task_item = pick_task_execute(); - if (task_item) { + while (true) { + auto task_item = pick_task_execute(); + if (task_item == nullptr) { + break; + } Process(task_item->task); task_item->state = TaskTableItemState::EXECUTED; if (subscriber_) { @@ -130,6 +137,7 @@ void Resource::executor_function() { subscriber_(std::static_pointer_cast(event)); } } + } } diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index 0395856fea..2f7d58eb57 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -25,12 +25,20 @@ protected: disk_resource_ = ResourceFactory::Create("disk"); cpu_resource_ = ResourceFactory::Create("cpu"); gpu_resource_ = ResourceFactory::Create("gpu"); - flag_ = false; + resources_.push_back(disk_resource_); + resources_.push_back(cpu_resource_); + resources_.push_back(gpu_resource_); - auto subscriber = [&](EventPtr event) { - std::unique_lock lock(mutex_); - if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) { - flag_ = true; + auto subscriber = [&](EventPtr event) { + if (event->Type() == EventType::COPY_COMPLETED) { + std::lock_guard lock(load_mutex_); + ++load_count_; + cv_.notify_one(); + } + + if (event->Type() == EventType::FINISH_TASK) { + std::lock_guard lock(load_mutex_); + ++exec_count_; cv_.notify_one(); } }; @@ -52,42 +60,82 @@ protected: } void - Wait() { - std::unique_lock lock(mutex_); - cv_.wait(lock, [&] { return flag_; }); + WaitLoader(uint64_t count) { + std::unique_lock lock(load_mutex_); + cv_.wait(lock, [&] { return load_count_ == count; }); + } + + void + WaitExecutor(uint64_t count) { + std::unique_lock lock(exec_mutex_); + cv_.wait(lock, [&] { return exec_count_ == count; }); } ResourcePtr disk_resource_; ResourcePtr cpu_resource_; ResourcePtr gpu_resource_; - bool flag_; - std::mutex mutex_; + std::vector resources_; + uint64_t load_count_ = 0; + uint64_t exec_count_ = 0; + std::mutex load_mutex_; + std::mutex exec_mutex_; std::condition_variable cv_; }; - TEST_F(ResourceTest, cpu_resource_test) { - auto task = std::make_shared(); - cpu_resource_->task_table().Put(task); + const uint64_t NUM = 100; + std::vector> tasks; + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(); + tasks.push_back(task); + cpu_resource_->task_table().Put(task); + } + cpu_resource_->WakeupLoader(); - Wait(); - ASSERT_EQ(task->load_count_, 1); - flag_ = false; + WaitLoader(NUM); +// std::cout << "after WakeupLoader" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + } + cpu_resource_->WakeupExecutor(); - Wait(); - ASSERT_EQ(task->exec_count_, 1); + WaitExecutor(NUM); +// std::cout << "after WakeupExecutor" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->exec_count_, 1); + } } TEST_F(ResourceTest, gpu_resource_test) { - auto task = std::make_shared(); - gpu_resource_->task_table().Put(task); + const uint64_t NUM = 100; + std::vector> tasks; + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(); + tasks.push_back(task); + gpu_resource_->task_table().Put(task); + } + gpu_resource_->WakeupLoader(); - Wait(); - ASSERT_EQ(task->load_count_, 1); - flag_ = false; + WaitLoader(NUM); +// std::cout << "after WakeupLoader" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + } + gpu_resource_->WakeupExecutor(); - Wait(); - ASSERT_EQ(task->exec_count_, 1); + WaitExecutor(NUM); +// std::cout << "after WakeupExecutor" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->exec_count_, 1); + } }