MS-609 Update task construct function

Former-commit-id: f187500ad365a90ae751b6fd4efa7f8b10e2f6c4
pull/191/head
wxyu 2019-10-07 19:01:56 +08:00
parent 3ad2b8d74c
commit 14d9af54a3
16 changed files with 46 additions and 26 deletions

View File

@ -24,6 +24,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-578 - Make sure milvus5.0 don't crack 0.3.1 data
- MS-585 - Update namespace in scheduler
- MS-608 - Update TODO names
- MS-609 - Update task construct function
## New Feature

View File

@ -42,8 +42,8 @@ std::vector<TaskPtr>
TaskCreator::Create(const SearchJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& index_file : job->index_files()) {
auto task = std::make_shared<XSearchTask>(index_file.second);
task->label() = std::make_shared<DefaultLabel>();
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<XSearchTask>(index_file.second, label);
task->job_ = job;
tasks.emplace_back(task);
}
@ -54,8 +54,8 @@ TaskCreator::Create(const SearchJobPtr& job) {
std::vector<TaskPtr>
TaskCreator::Create(const DeleteJobPtr& job) {
std::vector<TaskPtr> tasks;
auto task = std::make_shared<XDeleteTask>(job);
task->label() = std::make_shared<BroadcastLabel>();
auto label = std::make_shared<BroadcastLabel>();
auto task = std::make_shared<XDeleteTask>(job, label);
task->job_ = job;
tasks.emplace_back(task);

View File

@ -20,8 +20,8 @@
namespace milvus {
namespace scheduler {
XDeleteTask::XDeleteTask(const scheduler::DeleteJobPtr& delete_job)
: Task(TaskType::DeleteTask), delete_job_(delete_job) {
XDeleteTask::XDeleteTask(const scheduler::DeleteJobPtr& delete_job, TaskLabelPtr label)
: Task(TaskType::DeleteTask, std::move(label)), delete_job_(delete_job) {
}
void

View File

@ -25,7 +25,7 @@ namespace scheduler {
class XDeleteTask : public Task {
public:
explicit XDeleteTask(const scheduler::DeleteJobPtr& delete_job);
explicit XDeleteTask(const scheduler::DeleteJobPtr& delete_job, TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;

View File

@ -95,7 +95,8 @@ CollectFileMetrics(int file_type, size_t file_size) {
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : Task(TaskType::SearchTask), file_(file) {
XSearchTask::XSearchTask(TableFileSchemaPtr file, TaskLabelPtr label)
: Task(TaskType::SearchTask, std::move(label)), file_(file) {
if (file_) {
if (file_->metric_type_ != static_cast<int>(MetricType::L2)) {
metric_l2 = false;

View File

@ -29,7 +29,7 @@ namespace scheduler {
// TODO(wxyu): rewrite
class XSearchTask : public Task {
public:
explicit XSearchTask(TableFileSchemaPtr file);
explicit XSearchTask(TableFileSchemaPtr file, TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;

View File

@ -48,7 +48,7 @@ using TaskPtr = std::shared_ptr<Task>;
// TODO: re-design
class Task {
public:
explicit Task(TaskType type) : type_(type) {
explicit Task(TaskType type, TaskLabelPtr label) : type_(type), label_(std::move(label)) {
}
/*

View File

@ -21,7 +21,8 @@
namespace milvus {
namespace scheduler {
TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {
TestTask::TestTask(TableFileSchemaPtr& file, TaskLabelPtr label)
: XSearchTask(file, std::move(label)) {
}
void
@ -42,7 +43,9 @@ TestTask::Execute() {
void
TestTask::Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_; });
cv_.wait(lock, [&] {
return done_;
});
}
} // namespace scheduler

View File

@ -24,7 +24,7 @@ namespace scheduler {
class TestTask : public XSearchTask {
public:
explicit TestTask(TableFileSchemaPtr& file);
explicit TestTask(TableFileSchemaPtr& file, TaskLabelPtr label);
public:
void

View File

@ -22,6 +22,7 @@
#include <map>
#include <memory>
#include <functional>
namespace milvus {
namespace engine {

View File

@ -24,7 +24,7 @@ namespace milvus {
namespace scheduler {
TEST(TaskTest, INVALID_INDEX) {
auto search_task = std::make_shared<XSearchTask>(nullptr);
auto search_task = std::make_shared<XSearchTask>(nullptr, nullptr);
search_task->Load(LoadType::TEST, 10);
}

View File

@ -54,7 +54,8 @@ TEST(NormalTest, INST_TEST) {
ASSERT_FALSE(disks.empty());
if (auto observe = disks[0].lock()) {
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task = std::make_shared<ms::TestTask>(dummy);
auto label = std::make_shared<ms::DefaultLabel>();
auto task = std::make_shared<ms::TestTask>(dummy, label);
task->label() = std::make_shared<ms::DefaultLabel>();
tasks.push_back(task);
observe->task_table().Put(task);

View File

@ -23,6 +23,7 @@
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/ResourceFactory.h"
#include <gtest/gtest.h>
@ -185,7 +186,8 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy, label);
tasks.push_back(task);
disk_resource_->task_table().Put(task);
}
@ -210,7 +212,8 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy, label);
tasks.push_back(task);
cpu_resource_->task_table().Put(task);
}
@ -235,7 +238,8 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy, label);
tasks.push_back(task);
gpu_resource_->task_table().Put(task);
}
@ -260,7 +264,8 @@ TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy, label);
tasks.push_back(task);
test_resource_->task_table().Put(task);
}

View File

@ -21,6 +21,7 @@
#include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/ResourceMgr.h"
#include <gtest/gtest.h>
@ -184,7 +185,8 @@ TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {
};
mgr1_->RegisterSubscriber(callback);
TableFileSchemaPtr dummy = nullptr;
disk_res->task_table().Put(std::make_shared<TestTask>(dummy));
auto label = std::make_shared<DefaultLabel>();
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, label));
sleep(1);
ASSERT_TRUE(flag);
}

View File

@ -155,7 +155,8 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
insert_dummy_index_into_gpu_cache(1);
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy, label);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
@ -174,7 +175,8 @@ TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
tasks.clear();
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy1);
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy1, label);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
@ -242,7 +244,8 @@ TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) {
dummy->location_ = "location";
for (uint64_t i = 0; i < NUM; ++i) {
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
auto label = std::make_shared<DefaultLabel>();
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy, label);
task->label() = std::make_shared<SpecResLabel>(disk_);
tasks.push_back(task);
disk_.lock()->task_table().Put(task);

View File

@ -18,6 +18,7 @@
#include "scheduler/TaskTable.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include <gtest/gtest.h>
namespace {
@ -172,8 +173,9 @@ class TaskTableBaseTest : public ::testing::Test {
SetUp() override {
ms::TableFileSchemaPtr dummy = nullptr;
invalid_task_ = nullptr;
task1_ = std::make_shared<ms::TestTask>(dummy);
task2_ = std::make_shared<ms::TestTask>(dummy);
auto label = std::make_shared<ms::DefaultLabel>();
task1_ = std::make_shared<ms::TestTask>(dummy, label);
task2_ = std::make_shared<ms::TestTask>(dummy, label);
}
ms::TaskPtr invalid_task_;
@ -340,7 +342,8 @@ class TaskTableAdvanceTest : public ::testing::Test {
SetUp() override {
ms::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<ms::TestTask>(dummy);
auto label = std::make_shared<ms::DefaultLabel>();
auto task = std::make_shared<ms::TestTask>(dummy, label);
table1_.Put(task);
}