mirror of https://github.com/milvus-io/milvus.git
Merge remote-tracking branch 'main/0.5.1' into 0.5.1
Former-commit-id: 7bf691eb73bc86b6de4030226226ea57106c6bc3pull/189/head
commit
a6fd546196
|
@ -5,8 +5,11 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
# Milvus 0.5.1 (TODO)
|
||||
|
||||
## Bug
|
||||
|
||||
## Feature
|
||||
- \#90 - The server start error messages could be improved to enhance user experience
|
||||
- \#104 - test_scheduler core dump
|
||||
- \#115 - Using new structure for tasktable
|
||||
|
||||
## Improvement
|
||||
- \#64 - Improvement dump function in scheduler
|
||||
|
@ -16,9 +19,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- \#96 - Remove .a file in milvus/lib for docker-version
|
||||
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
|
||||
- \#122 - Add unique id for Job
|
||||
|
||||
## Feature
|
||||
- \#115 - Using new structure for tasktable
|
||||
- \#130 - Set task state MOVED after resource copy it completed
|
||||
|
||||
## Task
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ JobMgr::worker_function() {
|
|||
// disk resources NEVER be empty.
|
||||
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
|
||||
for (auto& task : tasks) {
|
||||
disk->task_table().Put(task);
|
||||
disk->task_table().Put(task, nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
|
|||
if (resource->HasExecutor() == false) {
|
||||
load_completed_event->task_table_item_->Move();
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource);
|
||||
break;
|
||||
}
|
||||
default: { break; }
|
||||
|
|
|
@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) {
|
|||
}
|
||||
|
||||
void
|
||||
TaskTable::Put(TaskPtr task) {
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
TaskTable::Put(TaskPtr task, TaskTableItemPtr from) {
|
||||
auto item = std::make_shared<TaskTableItem>(std::move(from));
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
|
@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) {
|
|||
}
|
||||
}
|
||||
|
||||
void
|
||||
TaskTable::Put(std::vector<TaskPtr>& tasks) {
|
||||
for (auto& task : tasks) {
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
item->timestamp.start = get_current_timestamp();
|
||||
table_.put(std::move(item));
|
||||
}
|
||||
if (subscriber_) {
|
||||
subscriber_();
|
||||
}
|
||||
}
|
||||
|
||||
size_t
|
||||
TaskTable::TaskToExecute() {
|
||||
size_t count = 0;
|
||||
|
|
|
@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable {
|
|||
Dump() const override;
|
||||
};
|
||||
|
||||
struct TaskTableItem;
|
||||
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
|
||||
|
||||
struct TaskTableItem : public interface::dumpable {
|
||||
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
|
||||
explicit TaskTableItem(TaskTableItemPtr f = nullptr)
|
||||
: id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex(), from(std::move(f)) {
|
||||
}
|
||||
|
||||
TaskTableItem(const TaskTableItem& src) = delete;
|
||||
|
@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable {
|
|||
TaskTableItemState state; // the state;
|
||||
std::mutex mutex;
|
||||
TaskTimestamp timestamp;
|
||||
TaskTableItemPtr from;
|
||||
|
||||
bool
|
||||
IsFinish();
|
||||
|
@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable {
|
|||
Dump() const override;
|
||||
};
|
||||
|
||||
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
|
||||
|
||||
class TaskTable : public interface::dumpable {
|
||||
public:
|
||||
TaskTable() : table_(1ULL << 16ULL) {
|
||||
|
@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable {
|
|||
* Put one task;
|
||||
*/
|
||||
void
|
||||
Put(TaskPtr task);
|
||||
|
||||
/*
|
||||
* Put tasks back of task table;
|
||||
* Called by DBImpl;
|
||||
*/
|
||||
void
|
||||
Put(std::vector<TaskPtr>& tasks);
|
||||
Put(TaskPtr task, TaskTableItemPtr from = nullptr);
|
||||
|
||||
size_t
|
||||
TaskToExecute();
|
||||
|
|
|
@ -28,13 +28,13 @@ namespace scheduler {
|
|||
class Action {
|
||||
public:
|
||||
static void
|
||||
PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self);
|
||||
PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self);
|
||||
|
||||
static void
|
||||
PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self);
|
||||
PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self);
|
||||
|
||||
static void
|
||||
PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest);
|
||||
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
|
||||
|
||||
static void
|
||||
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
|
|
|
@ -59,7 +59,7 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
|
|||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
|
||||
Action::PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self) {
|
||||
auto neighbours = get_neighbours_with_connetion(self);
|
||||
if (not neighbours.empty()) {
|
||||
std::vector<uint64_t> speeds;
|
||||
|
@ -78,7 +78,7 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
|
|||
for (uint64_t i = 0; i < speeds.size(); ++i) {
|
||||
rd_speed -= speeds[i];
|
||||
if (rd_speed <= 0) {
|
||||
neighbours[i].first->task_table().Put(task);
|
||||
neighbours[i].first->task_table().Put(task_item->task, task_item);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -89,22 +89,23 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
|
|||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
|
||||
Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) {
|
||||
auto neighbours = get_neighbours(self);
|
||||
for (auto& neighbour : neighbours) {
|
||||
neighbour->task_table().Put(task);
|
||||
neighbour->task_table().Put(task_item->task, task_item);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
|
||||
dest->task_table().Put(task);
|
||||
Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest) {
|
||||
dest->task_table().Put(task_item->task, task_item);
|
||||
}
|
||||
|
||||
void
|
||||
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
|
||||
auto task_item = event->task_table_item_;
|
||||
auto task = event->task_table_item_->task;
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
bool moved = false;
|
||||
|
@ -119,7 +120,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
|
|||
if (index != nullptr) {
|
||||
moved = true;
|
||||
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
|
||||
PushTaskToResource(event->task_table_item_->task, dest_resource);
|
||||
PushTaskToResource(event->task_table_item_, dest_resource);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -127,7 +128,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
|
|||
}
|
||||
|
||||
if (not moved) {
|
||||
PushTaskToNeighbourRandomly(task, resource);
|
||||
PushTaskToNeighbourRandomly(task_item, resource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -135,6 +136,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
|
|||
void
|
||||
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
auto task_item = event->task_table_item_;
|
||||
auto task = event->task_table_item_->task;
|
||||
if (resource->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
|
@ -213,7 +215,7 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
|
|||
// next_res->task_table().Put(task);
|
||||
// }
|
||||
event->task_table_item_->Move();
|
||||
next_res->task_table().Put(task);
|
||||
next_res->task_table().Put(task, task_item);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -180,6 +180,10 @@ Resource::loader_function() {
|
|||
}
|
||||
LoadFile(task_item->task);
|
||||
task_item->Loaded();
|
||||
if (task_item->from) {
|
||||
task_item->from->Moved();
|
||||
task_item->from = nullptr;
|
||||
}
|
||||
if (subscriber_) {
|
||||
auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
|
||||
subscriber_(std::static_pointer_cast<Event>(event));
|
||||
|
|
|
@ -193,16 +193,13 @@ TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
|
|||
|
||||
TEST_F(TaskTableBaseTest, PUT_BATCH) {
|
||||
std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
|
||||
empty_table_.Put(tasks);
|
||||
for (auto& task : tasks) {
|
||||
empty_table_.Put(task);
|
||||
}
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(1)->task, task2_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
|
||||
std::vector<milvus::scheduler::TaskPtr> tasks{};
|
||||
empty_table_.Put(tasks);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, SIZE) {
|
||||
ASSERT_EQ(empty_table_.size(), 0);
|
||||
empty_table_.Put(task1_);
|
||||
|
|
Loading…
Reference in New Issue