mirror of https://github.com/milvus-io/milvus.git
Remove DefaultLabel
parent
c451f8cbf2
commit
fdb7decdd3
|
@ -85,7 +85,7 @@ JobMgr::worker_function() {
|
|||
}
|
||||
|
||||
for (auto& task : tasks) {
|
||||
calculate_path(task);
|
||||
calculate_path(res_mgr_, task);
|
||||
}
|
||||
|
||||
// disk resources NEVER be empty.
|
||||
|
@ -103,7 +103,7 @@ JobMgr::build_task(const JobPtr& job) {
|
|||
}
|
||||
|
||||
void
|
||||
JobMgr::calculate_path(const TaskPtr& task) {
|
||||
JobMgr::calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task) {
|
||||
if (task->type_ != TaskType::SearchTask && task->type_ != TaskType::BuildIndexTask) {
|
||||
return;
|
||||
}
|
||||
|
@ -114,9 +114,9 @@ JobMgr::calculate_path(const TaskPtr& task) {
|
|||
|
||||
std::vector<std::string> path;
|
||||
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
|
||||
auto src = res_mgr_->GetDiskResources()[0];
|
||||
auto src = res_mgr->GetDiskResources()[0];
|
||||
auto dest = spec_label->resource();
|
||||
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
|
||||
ShortestPath(src.lock(), dest.lock(), res_mgr, path);
|
||||
task->path() = Path(path, path.size() - 1);
|
||||
}
|
||||
|
||||
|
|
|
@ -59,8 +59,9 @@ class JobMgr : public interface::dumpable {
|
|||
static std::vector<TaskPtr>
|
||||
build_task(const JobPtr& job);
|
||||
|
||||
void
|
||||
calculate_path(const TaskPtr& task);
|
||||
public:
|
||||
static void
|
||||
calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task);
|
||||
|
||||
private:
|
||||
bool running_ = false;
|
||||
|
|
|
@ -108,10 +108,6 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
|
|||
|
||||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include "scheduler/TaskCreator.h"
|
||||
#include "SchedInst.h"
|
||||
#include "tasklabel/BroadcastLabel.h"
|
||||
#include "tasklabel/DefaultLabel.h"
|
||||
#include "tasklabel/SpecResLabel.h"
|
||||
|
||||
namespace milvus {
|
||||
|
@ -47,8 +46,7 @@ std::vector<TaskPtr>
|
|||
TaskCreator::Create(const SearchJobPtr& job) {
|
||||
std::vector<TaskPtr> tasks;
|
||||
for (auto& index_file : job->index_files()) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
auto task = std::make_shared<XSearchTask>(index_file.second, label);
|
||||
auto task = std::make_shared<XSearchTask>(index_file.second, nullptr);
|
||||
task->job_ = job;
|
||||
tasks.emplace_back(task);
|
||||
}
|
||||
|
@ -70,11 +68,8 @@ TaskCreator::Create(const DeleteJobPtr& job) {
|
|||
std::vector<TaskPtr>
|
||||
TaskCreator::Create(const BuildIndexJobPtr& job) {
|
||||
std::vector<TaskPtr> tasks;
|
||||
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
|
||||
|
||||
for (auto& to_index_file : job->to_index_files()) {
|
||||
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label);
|
||||
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, nullptr);
|
||||
task->job_ = job;
|
||||
tasks.emplace_back(task);
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
// under the License.
|
||||
|
||||
#include "scheduler/Utils.h"
|
||||
#include "server/Config.h"
|
||||
|
||||
#ifdef MILVUS_GPU_VERSION
|
||||
#include <cuda_runtime.h>
|
||||
|
|
|
@ -36,10 +36,6 @@ class Action {
|
|||
static void
|
||||
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
|
||||
|
||||
static void
|
||||
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event);
|
||||
|
||||
static void
|
||||
SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event);
|
||||
|
|
|
@ -101,38 +101,6 @@ Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest)
|
|||
dest->task_table().Put(task_item->task, task_item);
|
||||
}
|
||||
|
||||
void
|
||||
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
|
||||
auto task_item = event->task_table_item_;
|
||||
auto task = event->task_table_item_->task;
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
bool moved = false;
|
||||
|
||||
// to support test task, REFACTOR
|
||||
if (resource->type() == ResourceType::CPU) {
|
||||
if (auto index_engine = search_task->index_engine_) {
|
||||
auto location = index_engine->GetLocation();
|
||||
|
||||
for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) {
|
||||
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
|
||||
if (index != nullptr) {
|
||||
moved = true;
|
||||
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
|
||||
PushTaskToResource(event->task_table_item_, dest_resource);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not moved) {
|
||||
PushTaskToNeighbourRandomly(task_item, resource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "TaskLabel.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
class DefaultLabel : public TaskLabel {
|
||||
public:
|
||||
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {
|
||||
}
|
||||
};
|
||||
|
||||
using DefaultLabelPtr = std::shared_ptr<DefaultLabel>;
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
|
@ -23,7 +23,6 @@ namespace milvus {
|
|||
namespace scheduler {
|
||||
|
||||
enum class TaskLabelType {
|
||||
DEFAULT, // means can be executed in any resource
|
||||
SPECIFIED_RESOURCE, // means must executing in special resource
|
||||
BROADCAST, // means all enable-executor resource must execute task
|
||||
};
|
||||
|
|
|
@ -21,7 +21,6 @@ set(test_files
|
|||
${CMAKE_CURRENT_SOURCE_DIR}/test_algorithm.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_event.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_node.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_normal.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_resource.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_resource_factory.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_resource_mgr.cpp
|
||||
|
|
|
@ -1,72 +0,0 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include "scheduler/ResourceFactory.h"
|
||||
#include "scheduler/ResourceMgr.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "scheduler/Scheduler.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/tasklabel/DefaultLabel.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace {
|
||||
|
||||
namespace ms = milvus::scheduler;
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST(NormalTest, INST_TEST) {
|
||||
// ResourceMgr only compose resources, provide unified event
|
||||
auto res_mgr = ms::ResMgrInst::GetInstance();
|
||||
|
||||
res_mgr->Add(ms::ResourceFactory::Create("disk", "DISK", 0, true, false));
|
||||
res_mgr->Add(ms::ResourceFactory::Create("cpu", "CPU", 0, true, true));
|
||||
|
||||
auto IO = ms::Connection("IO", 500.0);
|
||||
res_mgr->Connect("disk", "cpu", IO);
|
||||
|
||||
auto scheduler = ms::SchedInst::GetInstance();
|
||||
|
||||
res_mgr->Start();
|
||||
scheduler->Start();
|
||||
|
||||
const uint64_t NUM_TASK = 2;
|
||||
std::vector<std::shared_ptr<ms::TestTask>> tasks;
|
||||
ms::TableFileSchemaPtr dummy = nullptr;
|
||||
|
||||
auto disks = res_mgr->GetDiskResources();
|
||||
ASSERT_FALSE(disks.empty());
|
||||
if (auto observe = disks[0].lock()) {
|
||||
for (uint64_t i = 0; i < NUM_TASK; ++i) {
|
||||
auto label = std::make_shared<ms::DefaultLabel>();
|
||||
auto task = std::make_shared<ms::TestTask>(dummy, label);
|
||||
task->label() = std::make_shared<ms::DefaultLabel>();
|
||||
tasks.push_back(task);
|
||||
observe->task_table().Put(task);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& task : tasks) {
|
||||
task->Wait();
|
||||
ASSERT_EQ(task->load_count_, 1);
|
||||
ASSERT_EQ(task->exec_count_, 1);
|
||||
}
|
||||
|
||||
scheduler->Stop();
|
||||
res_mgr->Stop();
|
||||
}
|
|
@ -24,7 +24,7 @@
|
|||
#include "scheduler/resource/TestResource.h"
|
||||
#include "scheduler/task/Task.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/tasklabel/DefaultLabel.h"
|
||||
#include "scheduler/tasklabel/SpecResLabel.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
@ -182,8 +182,10 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
|
|||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
auto label = std::make_shared<SpecResLabel>(disk_resource_);
|
||||
auto task = std::make_shared<TestTask>(dummy, label);
|
||||
std::vector<std::string> path{disk_resource_->name()};
|
||||
task->path() = Path(path, 0);
|
||||
tasks.push_back(task);
|
||||
disk_resource_->task_table().Put(task);
|
||||
}
|
||||
|
@ -208,8 +210,10 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
|
|||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
auto label = std::make_shared<SpecResLabel>(cpu_resource_);
|
||||
auto task = std::make_shared<TestTask>(dummy, label);
|
||||
std::vector<std::string> path{cpu_resource_->name()};
|
||||
task->path() = Path(path, 0);
|
||||
tasks.push_back(task);
|
||||
cpu_resource_->task_table().Put(task);
|
||||
}
|
||||
|
@ -234,8 +238,10 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
|
|||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
auto label = std::make_shared<SpecResLabel>(gpu_resource_);
|
||||
auto task = std::make_shared<TestTask>(dummy, label);
|
||||
std::vector<std::string> path{gpu_resource_->name()};
|
||||
task->path() = Path(path, 0);
|
||||
tasks.push_back(task);
|
||||
gpu_resource_->task_table().Put(task);
|
||||
}
|
||||
|
@ -260,8 +266,10 @@ TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
|
|||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
auto label = std::make_shared<SpecResLabel>(test_resource_);
|
||||
auto task = std::make_shared<TestTask>(dummy, label);
|
||||
std::vector<std::string> path{test_resource_->name()};
|
||||
task->path() = Path(path, 0);
|
||||
tasks.push_back(task);
|
||||
test_resource_->task_table().Put(task);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@
|
|||
#include "scheduler/resource/GpuResource.h"
|
||||
#include "scheduler/resource/TestResource.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/tasklabel/DefaultLabel.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
@ -187,8 +186,7 @@ TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {
|
|||
auto callback = [&](EventPtr event) { flag = true; };
|
||||
mgr1_->RegisterSubscriber(callback);
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, label));
|
||||
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, nullptr));
|
||||
sleep(1);
|
||||
ASSERT_TRUE(flag);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
#include "scheduler/Scheduler.h"
|
||||
#include "scheduler/resource/Resource.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/tasklabel/DefaultLabel.h"
|
||||
#include "scheduler/tasklabel/SpecResLabel.h"
|
||||
#include "utils/Error.h"
|
||||
#include "wrapper/VecIndex.h"
|
||||
|
@ -150,46 +149,6 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) {
|
|||
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj);
|
||||
}
|
||||
|
||||
TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>();
|
||||
dummy->location_ = "location";
|
||||
|
||||
insert_dummy_index_into_gpu_cache(1);
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
auto task = std::make_shared<TestTask>(dummy, label);
|
||||
task->label() = std::make_shared<DefaultLabel>();
|
||||
tasks.push_back(task);
|
||||
cpu_resource_.lock()->task_table().Put(task);
|
||||
}
|
||||
|
||||
sleep(3);
|
||||
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM);
|
||||
}
|
||||
|
||||
TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy1 = std::make_shared<TableFileSchema>();
|
||||
dummy1->location_ = "location";
|
||||
|
||||
tasks.clear();
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
auto task = std::make_shared<TestTask>(dummy1, label);
|
||||
task->label() = std::make_shared<DefaultLabel>();
|
||||
tasks.push_back(task);
|
||||
cpu_resource_.lock()->task_table().Put(task);
|
||||
}
|
||||
|
||||
sleep(3);
|
||||
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
}
|
||||
|
||||
class SchedulerTest2 : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include "scheduler/TaskTable.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/tasklabel/DefaultLabel.h"
|
||||
|
||||
/************ TaskTableBaseTest ************/
|
||||
|
||||
|
@ -162,9 +161,8 @@ class TaskTableBaseTest : public ::testing::Test {
|
|||
SetUp() override {
|
||||
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
|
||||
invalid_task_ = nullptr;
|
||||
auto label = std::make_shared<milvus::scheduler::DefaultLabel>();
|
||||
task1_ = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
|
||||
task2_ = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
|
||||
task1_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
|
||||
task2_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
|
||||
}
|
||||
|
||||
milvus::scheduler::TaskPtr invalid_task_;
|
||||
|
@ -320,8 +318,7 @@ class TaskTableAdvanceTest : public ::testing::Test {
|
|||
SetUp() override {
|
||||
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < 8; ++i) {
|
||||
auto label = std::make_shared<milvus::scheduler::DefaultLabel>();
|
||||
auto task = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
|
||||
auto task = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
|
||||
table1_.Put(task);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue