mirror of https://github.com/milvus-io/milvus.git
Using shared_ptr instead of weak_ptr to avoid performance loss
Former-commit-id: 250cb7200b6eefdd9cbb9fd631379d59aca2f368pull/191/head
parent
5e504b3435
commit
53b3b60db2
|
@ -12,6 +12,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- \#80 - Print version information into log during server start
|
||||
- \#82 - Move easyloggingpp into "external" directory
|
||||
- \#92 - Speed up CMake build process
|
||||
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
|
||||
|
||||
## Feature
|
||||
- \#115 - Using new structure for tasktable
|
||||
|
|
|
@ -54,7 +54,7 @@ ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrP
|
|||
auto cur_neighbours = cur_node->GetNeighbours();
|
||||
|
||||
for (auto& neighbour : cur_neighbours) {
|
||||
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock());
|
||||
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node);
|
||||
dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] =
|
||||
neighbour.connection.transport_cost();
|
||||
}
|
||||
|
|
|
@ -26,10 +26,8 @@
|
|||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) {
|
||||
if (auto mgr = res_mgr_.lock()) {
|
||||
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
|
||||
}
|
||||
Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) {
|
||||
res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::START_UP),
|
||||
std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1)));
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::LOAD_COMPLETED),
|
||||
|
@ -40,6 +38,10 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::m
|
|||
std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1)));
|
||||
}
|
||||
|
||||
Scheduler::~Scheduler() {
|
||||
res_mgr_ = nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::Start() {
|
||||
running_ = true;
|
||||
|
@ -100,51 +102,45 @@ Scheduler::Process(const EventPtr& event) {
|
|||
void
|
||||
Scheduler::OnLoadCompleted(const EventPtr& event) {
|
||||
auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event);
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupExecutor();
|
||||
|
||||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
if (resource->HasExecutor() == false) {
|
||||
load_completed_event->task_table_item_->Move();
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
break;
|
||||
}
|
||||
default: { break; }
|
||||
auto resource = event->resource_;
|
||||
resource->WakeupExecutor();
|
||||
|
||||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
resource->WakeupLoader();
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
if (resource->HasExecutor() == false) {
|
||||
load_completed_event->task_table_item_->Move();
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
break;
|
||||
}
|
||||
default: { break; }
|
||||
}
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnStartUp(const EventPtr& event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
event->resource_->WakeupLoader();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnFinishTask(const EventPtr& event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
event->resource_->WakeupLoader();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnTaskTableUpdated(const EventPtr& event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
event->resource_->WakeupLoader();
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
|
|
|
@ -34,7 +34,9 @@ namespace scheduler {
|
|||
|
||||
class Scheduler : public interface::dumpable {
|
||||
public:
|
||||
explicit Scheduler(ResourceMgrWPtr res_mgr);
|
||||
explicit Scheduler(ResourceMgrPtr res_mgr);
|
||||
|
||||
~Scheduler();
|
||||
|
||||
Scheduler(const Scheduler&) = delete;
|
||||
Scheduler(Scheduler&&) = delete;
|
||||
|
@ -118,7 +120,7 @@ class Scheduler : public interface::dumpable {
|
|||
|
||||
std::unordered_map<uint64_t, std::function<void(EventPtr)>> event_register_;
|
||||
|
||||
ResourceMgrWPtr res_mgr_;
|
||||
ResourceMgrPtr res_mgr_;
|
||||
std::queue<EventPtr> event_queue_;
|
||||
std::thread worker_thread_;
|
||||
std::mutex event_mutex_;
|
||||
|
|
|
@ -37,10 +37,11 @@ class Action {
|
|||
PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest);
|
||||
|
||||
static void
|
||||
DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event);
|
||||
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event);
|
||||
|
||||
static void
|
||||
SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
||||
SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event);
|
||||
};
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ std::vector<ResourcePtr>
|
|||
get_neighbours(const ResourcePtr& self) {
|
||||
std::vector<ResourcePtr> neighbours;
|
||||
for (auto& neighbour_node : self->GetNeighbours()) {
|
||||
auto node = neighbour_node.neighbour_node.lock();
|
||||
auto node = neighbour_node.neighbour_node;
|
||||
if (not node)
|
||||
continue;
|
||||
|
||||
|
@ -46,7 +46,7 @@ std::vector<std::pair<ResourcePtr, Connection>>
|
|||
get_neighbours_with_connetion(const ResourcePtr& self) {
|
||||
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
|
||||
for (auto& neighbour_node : self->GetNeighbours()) {
|
||||
auto node = neighbour_node.neighbour_node.lock();
|
||||
auto node = neighbour_node.neighbour_node;
|
||||
if (not node)
|
||||
continue;
|
||||
|
||||
|
@ -102,7 +102,7 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
|
|||
}
|
||||
|
||||
void
|
||||
Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
||||
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
|
||||
auto task = event->task_table_item_->task;
|
||||
|
@ -114,11 +114,11 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
|||
if (auto index_engine = search_task->index_engine_) {
|
||||
auto location = index_engine->GetLocation();
|
||||
|
||||
for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
|
||||
for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) {
|
||||
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
|
||||
if (index != nullptr) {
|
||||
moved = true;
|
||||
auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
|
||||
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
|
||||
PushTaskToResource(event->task_table_item_->task, dest_resource);
|
||||
break;
|
||||
}
|
||||
|
@ -133,17 +133,17 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
|||
}
|
||||
|
||||
void
|
||||
Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
||||
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
auto task = event->task_table_item_->task;
|
||||
if (resource->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
auto compute_resources = res_mgr.lock()->GetComputeResources();
|
||||
auto compute_resources = res_mgr->GetComputeResources();
|
||||
std::vector<std::vector<std::string>> paths;
|
||||
std::vector<uint64_t> transport_costs;
|
||||
for (auto& res : compute_resources) {
|
||||
std::vector<std::string> path;
|
||||
uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
|
||||
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
|
@ -187,10 +187,10 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
|||
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
|
||||
|
||||
bool find_gpu_res = false;
|
||||
if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
|
||||
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->name() ==
|
||||
res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
|
||||
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
|
||||
find_gpu_res = true;
|
||||
Path task_path(paths[i], paths[i].size() - 1);
|
||||
task->path() = task_path;
|
||||
|
@ -208,7 +208,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
|||
resource->WakeupExecutor();
|
||||
} else {
|
||||
auto next_res_name = task->path().Next();
|
||||
auto next_res = res_mgr.lock()->GetResource(next_res_name);
|
||||
auto next_res = res_mgr->GetResource(next_res_name);
|
||||
// if (event->task_table_item_->Move()) {
|
||||
// next_res->task_table().Put(task);
|
||||
// }
|
||||
|
|
|
@ -30,7 +30,7 @@ class Resource;
|
|||
|
||||
class Event {
|
||||
public:
|
||||
explicit Event(EventType type, std::weak_ptr<Resource> resource) : type_(type), resource_(std::move(resource)) {
|
||||
explicit Event(EventType type, std::shared_ptr<Resource> resource) : type_(type), resource_(std::move(resource)) {
|
||||
}
|
||||
|
||||
inline EventType
|
||||
|
@ -46,7 +46,7 @@ class Event {
|
|||
|
||||
public:
|
||||
EventType type_;
|
||||
std::weak_ptr<Resource> resource_;
|
||||
std::shared_ptr<Resource> resource_;
|
||||
};
|
||||
|
||||
using EventPtr = std::shared_ptr<Event>;
|
||||
|
|
|
@ -29,7 +29,7 @@ namespace scheduler {
|
|||
|
||||
class FinishTaskEvent : public Event {
|
||||
public:
|
||||
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
FinishTaskEvent(std::shared_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
: Event(EventType::FINISH_TASK, std::move(resource)), task_table_item_(std::move(task_table_item)) {
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ namespace scheduler {
|
|||
|
||||
class LoadCompletedEvent : public Event {
|
||||
public:
|
||||
LoadCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
LoadCompletedEvent(std::shared_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
: Event(EventType::LOAD_COMPLETED, std::move(resource)), task_table_item_(std::move(task_table_item)) {
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace scheduler {
|
|||
|
||||
class StartUpEvent : public Event {
|
||||
public:
|
||||
explicit StartUpEvent(std::weak_ptr<Resource> resource) : Event(EventType::START_UP, std::move(resource)) {
|
||||
explicit StartUpEvent(std::shared_ptr<Resource> resource) : Event(EventType::START_UP, std::move(resource)) {
|
||||
}
|
||||
|
||||
inline std::string
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace scheduler {
|
|||
|
||||
class TaskTableUpdatedEvent : public Event {
|
||||
public:
|
||||
explicit TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
|
||||
explicit TaskTableUpdatedEvent(std::shared_ptr<Resource> resource)
|
||||
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {
|
||||
}
|
||||
|
||||
|
|
|
@ -58,9 +58,7 @@ Node::Dump() const {
|
|||
void
|
||||
Node::AddNeighbour(const NeighbourNodePtr& neighbour_node, Connection& connection) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
if (auto s = neighbour_node.lock()) {
|
||||
neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection)));
|
||||
}
|
||||
neighbours_.emplace(std::make_pair(neighbour_node->id_, Neighbour(neighbour_node, connection)));
|
||||
// else do nothing, consider it..
|
||||
}
|
||||
|
||||
|
|
|
@ -31,10 +31,14 @@ namespace scheduler {
|
|||
|
||||
class Node;
|
||||
|
||||
using NeighbourNodePtr = std::weak_ptr<Node>;
|
||||
using NeighbourNodePtr = std::shared_ptr<Node>;
|
||||
|
||||
struct Neighbour {
|
||||
Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(nei), connection(conn) {
|
||||
Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(std::move(nei)), connection(std::move(conn)) {
|
||||
}
|
||||
|
||||
~Neighbour() {
|
||||
neighbour_node = nullptr;
|
||||
}
|
||||
|
||||
NeighbourNodePtr neighbour_node;
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace milvus {
|
|||
namespace scheduler {
|
||||
|
||||
TEST(EventTest, START_UP_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<StartUpEvent>(res);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
@ -36,7 +36,7 @@ TEST(EventTest, START_UP_EVENT) {
|
|||
}
|
||||
|
||||
TEST(EventTest, LOAD_COMPLETED_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<LoadCompletedEvent>(res, nullptr);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
@ -44,7 +44,7 @@ TEST(EventTest, LOAD_COMPLETED_EVENT) {
|
|||
}
|
||||
|
||||
TEST(EventTest, FINISH_TASK_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<FinishTaskEvent>(res, nullptr);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
@ -53,7 +53,7 @@ TEST(EventTest, FINISH_TASK_EVENT) {
|
|||
|
||||
|
||||
TEST(EventTest, TASKTABLE_UPDATED_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<TaskTableUpdatedEvent>(res);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
|
|
@ -15,15 +15,14 @@
|
|||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
#include "scheduler/resource/Node.h"
|
||||
#include <gtest/gtest.h>
|
||||
#include "scheduler/resource/Node.h"
|
||||
|
||||
namespace {
|
||||
|
||||
namespace ms = milvus::scheduler;
|
||||
|
||||
} // namespace
|
||||
} // namespace
|
||||
|
||||
class NodeTest : public ::testing::Test {
|
||||
protected:
|
||||
|
@ -73,9 +72,11 @@ TEST_F(NodeTest, GET_NEIGHBOURS) {
|
|||
bool n2 = false, n3 = false;
|
||||
auto node1_neighbours = node1_->GetNeighbours();
|
||||
ASSERT_EQ(node1_neighbours.size(), 2);
|
||||
for (auto &n : node1_neighbours) {
|
||||
if (n.neighbour_node.lock() == node2_) n2 = true;
|
||||
if (n.neighbour_node.lock() == node3_) n3 = true;
|
||||
for (auto& n : node1_neighbours) {
|
||||
if (n.neighbour_node == node2_)
|
||||
n2 = true;
|
||||
if (n.neighbour_node == node3_)
|
||||
n3 = true;
|
||||
}
|
||||
ASSERT_TRUE(n2);
|
||||
ASSERT_TRUE(n3);
|
||||
|
@ -84,7 +85,7 @@ TEST_F(NodeTest, GET_NEIGHBOURS) {
|
|||
{
|
||||
auto node2_neighbours = node2_->GetNeighbours();
|
||||
ASSERT_EQ(node2_neighbours.size(), 1);
|
||||
ASSERT_EQ(node2_neighbours[0].neighbour_node.lock(), node1_);
|
||||
ASSERT_EQ(node2_neighbours[0].neighbour_node, node1_);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -100,4 +101,3 @@ TEST_F(NodeTest, DUMP) {
|
|||
std::cout << node2_->Dump();
|
||||
ASSERT_FALSE(node2_->Dump().empty());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue