mirror of https://github.com/milvus-io/milvus.git
Merge remote-tracking branch 'upstream/branch-0.4.0' into branch-0.4.0
Former-commit-id: af1332f7b1aa9fe5a504644581a3658665d50e2bpull/191/head
commit
da6e37c8b8
|
@ -8,7 +8,7 @@ container('publish-docker') {
|
|||
sh "curl -O -u anonymous: ftp://192.168.1.126/data/${PROJECT_NAME}/engine/${JOB_NAME}-${BUILD_ID}/${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz"
|
||||
sh "tar zxvf ${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz"
|
||||
try {
|
||||
docker.withRegistry('https://registry.zilliz.com', '${params.DOCKER_PUBLISH_USER}') {
|
||||
docker.withRegistry('https://registry.zilliz.com', "${params.DOCKER_PUBLISH_USER}") {
|
||||
def customImage = docker.build("${PROJECT_NAME}/engine:${DOCKER_VERSION}")
|
||||
customImage.push()
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-378 - Debug and Update normal_test in scheduler unittest
|
||||
- MS-379 - Add Dump implementation in Resource
|
||||
- MS-380 - Update resource loader and executor, work util all finished
|
||||
- MS-383 - Modify condition variable usage in scheduler
|
||||
- MS-384 - Add global instance of ResourceMgr and Scheduler
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
|
|
@ -76,7 +76,7 @@ ResourceMgr::Stop() {
|
|||
|
||||
void
|
||||
ResourceMgr::PostEvent(const EventPtr &event) {
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
queue_.emplace(event);
|
||||
event_cv_.notify_one();
|
||||
}
|
||||
|
@ -100,13 +100,14 @@ ResourceMgr::event_process() {
|
|||
event_cv_.wait(lock, [this] { return !queue_.empty(); });
|
||||
|
||||
auto event = queue_.front();
|
||||
queue_.pop();
|
||||
lock.unlock();
|
||||
if (event == nullptr) {
|
||||
break;
|
||||
}
|
||||
|
||||
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
|
||||
|
||||
queue_.pop();
|
||||
if (subscriber_) {
|
||||
subscriber_(event);
|
||||
}
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
|
@ -61,7 +60,7 @@ public:
|
|||
Stop();
|
||||
|
||||
void
|
||||
PostEvent(const EventPtr& event);
|
||||
PostEvent(const EventPtr &event);
|
||||
|
||||
// TODO: add stats interface(low)
|
||||
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "SchedInst.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
ResourceMgrPtr ResMgrInst::instance = nullptr;
|
||||
std::mutex ResMgrInst::mutex_;
|
||||
|
||||
SchedulerPtr SchedInst::instance = nullptr;
|
||||
std::mutex SchedInst::mutex_;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "ResourceMgr.h"
|
||||
#include "Scheduler.h"
|
||||
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class ResMgrInst {
|
||||
public:
|
||||
static ResourceMgrPtr
|
||||
GetInstance() {
|
||||
if (instance == nullptr) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (instance == nullptr) {
|
||||
instance = std::make_shared<ResourceMgr>();
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private:
|
||||
static ResourceMgrPtr instance;
|
||||
static std::mutex mutex_;
|
||||
};
|
||||
|
||||
class SchedInst {
|
||||
public:
|
||||
static SchedulerPtr
|
||||
GetInstance() {
|
||||
if (instance == nullptr) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (instance == nullptr) {
|
||||
instance = std::make_shared<Scheduler>(ResMgrInst::GetInstance());
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private:
|
||||
static SchedulerPtr instance;
|
||||
static std::mutex mutex_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -41,10 +41,11 @@ Scheduler::Stop() {
|
|||
|
||||
void
|
||||
Scheduler::PostEvent(const EventPtr &event) {
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
event_queue_.push(event);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
event_queue_.push(event);
|
||||
}
|
||||
event_cv_.notify_one();
|
||||
// SERVER_LOG_DEBUG << "Scheduler post " << *event;
|
||||
}
|
||||
|
||||
std::string
|
||||
|
@ -58,12 +59,11 @@ Scheduler::worker_function() {
|
|||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
|
||||
auto event = event_queue_.front();
|
||||
event_queue_.pop();
|
||||
if (event == nullptr) {
|
||||
break;
|
||||
}
|
||||
|
||||
// SERVER_LOG_DEBUG << "Scheduler process " << *event;
|
||||
event_queue_.pop();
|
||||
Process(event);
|
||||
}
|
||||
}
|
||||
|
@ -105,16 +105,14 @@ Scheduler::OnStartUp(const EventPtr &event) {
|
|||
void
|
||||
Scheduler::OnFinishTask(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupExecutor();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnCopyCompleted(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
resource->WakeupExecutor();
|
||||
if (resource->Type()== ResourceType::DISK) {
|
||||
if (resource->Type() == ResourceType::DISK) {
|
||||
Action::PushTaskToNeighbour(event->resource_);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include <iostream>
|
||||
#include "Action.h"
|
||||
|
||||
|
||||
|
@ -16,7 +17,7 @@ push_task(const ResourcePtr &self, const ResourcePtr &other) {
|
|||
auto &self_task_table = self->task_table();
|
||||
auto &other_task_table = other->task_table();
|
||||
CacheMgr cache;
|
||||
auto indexes = PickToMove(self_task_table, cache, 1);
|
||||
auto indexes = PickToMove(self_task_table, cache, 10);
|
||||
for (auto index : indexes) {
|
||||
if (self_task_table.Move(index)) {
|
||||
auto task = self_task_table.Get(index)->task;
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include <iostream>
|
||||
#include "Resource.h"
|
||||
|
||||
|
||||
|
@ -61,19 +62,23 @@ TaskTable &Resource::task_table() {
|
|||
}
|
||||
|
||||
void Resource::WakeupLoader() {
|
||||
std::lock_guard<std::mutex> lock(load_mutex_);
|
||||
load_flag_ = true;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(load_mutex_);
|
||||
load_flag_ = true;
|
||||
}
|
||||
load_cv_.notify_one();
|
||||
}
|
||||
|
||||
void Resource::WakeupExecutor() {
|
||||
std::lock_guard<std::mutex> lock(exec_mutex_);
|
||||
exec_flag_ = true;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(exec_mutex_);
|
||||
exec_flag_ = true;
|
||||
}
|
||||
exec_cv_.notify_one();
|
||||
}
|
||||
|
||||
TaskTableItemPtr Resource::pick_task_load() {
|
||||
auto indexes = PickToLoad(task_table_, 3);
|
||||
auto indexes = PickToLoad(task_table_, 10);
|
||||
for (auto index : indexes) {
|
||||
// try to set one task loading, then return
|
||||
if (task_table_.Load(index))
|
||||
|
@ -99,6 +104,7 @@ void Resource::loader_function() {
|
|||
std::unique_lock<std::mutex> lock(load_mutex_);
|
||||
load_cv_.wait(lock, [&] { return load_flag_; });
|
||||
load_flag_ = false;
|
||||
lock.unlock();
|
||||
while (true) {
|
||||
auto task_item = pick_task_load();
|
||||
if (task_item == nullptr) {
|
||||
|
@ -125,6 +131,7 @@ void Resource::executor_function() {
|
|||
std::unique_lock<std::mutex> lock(exec_mutex_);
|
||||
exec_cv_.wait(lock, [&] { return exec_flag_; });
|
||||
exec_flag_ = false;
|
||||
lock.unlock();
|
||||
while (true) {
|
||||
auto task_item = pick_task_execute();
|
||||
if (task_item == nullptr) {
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
#include "scheduler/ResourceMgr.h"
|
||||
#include "scheduler/Scheduler.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "utils/Log.h"
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
@ -10,7 +11,8 @@ using namespace zilliz::milvus::engine;
|
|||
|
||||
TEST(normal_test, test1) {
|
||||
// ResourceMgr only compose resources, provide unified event
|
||||
auto res_mgr = std::make_shared<ResourceMgr>();
|
||||
// auto res_mgr = std::make_shared<ResourceMgr>();
|
||||
auto res_mgr = ResMgrInst::GetInstance();
|
||||
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd"));
|
||||
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu"));
|
||||
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu"));
|
||||
|
@ -24,62 +26,27 @@ TEST(normal_test, test1) {
|
|||
|
||||
res_mgr->Start();
|
||||
|
||||
auto scheduler = new Scheduler(res_mgr);
|
||||
// auto scheduler = new Scheduler(res_mgr);
|
||||
auto scheduler = SchedInst::GetInstance();
|
||||
scheduler->Start();
|
||||
|
||||
auto task1 = std::make_shared<TestTask>();
|
||||
auto task2 = std::make_shared<TestTask>();
|
||||
auto task3 = std::make_shared<TestTask>();
|
||||
auto task4 = std::make_shared<TestTask>();
|
||||
if (auto observe = disk.lock()) {
|
||||
observe->task_table().Put(task1);
|
||||
observe->task_table().Put(task2);
|
||||
observe->task_table().Put(task3);
|
||||
observe->task_table().Put(task4);
|
||||
const uint64_t NUM_TASK = 100;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
for (uint64_t i = 0; i < NUM_TASK; ++i) {
|
||||
if (auto observe = disk.lock()) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
tasks.push_back(task);
|
||||
observe->task_table().Put(task);
|
||||
}
|
||||
}
|
||||
|
||||
// if (auto disk_r = disk.lock()) {
|
||||
// if (auto cpu_r = cpu.lock()) {
|
||||
// if (auto gpu1_r = gpu1.lock()) {
|
||||
// if (auto gpu2_r = gpu2.lock()) {
|
||||
// std::cout << "<<<<<<<<<<before<<<<<<<<<<" << std::endl;
|
||||
// std::cout << "disk:" << std::endl;
|
||||
// std::cout << disk_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "cpu:" << std::endl;
|
||||
// std::cout << cpu_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu1:" << std::endl;
|
||||
// std::cout << gpu1_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu2:" << std::endl;
|
||||
// std::cout << gpu2_r->task_table().Dump() << std::endl;
|
||||
// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
sleep(1);
|
||||
|
||||
sleep(5);
|
||||
|
||||
// if (auto disk_r = disk.lock()) {
|
||||
// if (auto cpu_r = cpu.lock()) {
|
||||
// if (auto gpu1_r = gpu1.lock()) {
|
||||
// if (auto gpu2_r = gpu2.lock()) {
|
||||
// std::cout << "<<<<<<<<<<after<<<<<<<<<<" << std::endl;
|
||||
// std::cout << "disk:" << std::endl;
|
||||
// std::cout << disk_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "cpu:" << std::endl;
|
||||
// std::cout << cpu_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu1:" << std::endl;
|
||||
// std::cout << gpu1_r->task_table().Dump() << std::endl;
|
||||
// std::cout << "gpu2:" << std::endl;
|
||||
// std::cout << gpu2_r->task_table().Dump() << std::endl;
|
||||
// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
scheduler->Stop();
|
||||
res_mgr->Stop();
|
||||
|
||||
ASSERT_EQ(task1->load_count_, 1);
|
||||
ASSERT_EQ(task1->exec_count_, 1);
|
||||
for (uint64_t i = 0 ; i < NUM_TASK; ++i) {
|
||||
ASSERT_EQ(tasks[i]->load_count_, 1);
|
||||
ASSERT_EQ(tasks[i]->exec_count_, 1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue