Merge pull request #418 from fishpenguin/0.6.0-yk-refactor-scheduler

Refactor optimizer
pull/431/head
Jin Hai 2019-11-20 00:17:41 +08:00 committed by GitHub
commit 8c48d95faf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 270 additions and 302 deletions

View File

@ -85,7 +85,7 @@ JobMgr::worker_function() {
}
for (auto& task : tasks) {
calculate_path(task);
calculate_path(res_mgr_, task);
}
// disk resources NEVER be empty.
@ -103,7 +103,7 @@ JobMgr::build_task(const JobPtr& job) {
}
void
JobMgr::calculate_path(const TaskPtr& task) {
JobMgr::calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask && task->type_ != TaskType::BuildIndexTask) {
return;
}
@ -114,9 +114,9 @@ JobMgr::calculate_path(const TaskPtr& task) {
std::vector<std::string> path;
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto src = res_mgr->GetDiskResources()[0];
auto dest = spec_label->resource();
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
ShortestPath(src.lock(), dest.lock(), res_mgr, path);
task->path() = Path(path, path.size() - 1);
}

View File

@ -59,8 +59,9 @@ class JobMgr : public interface::dumpable {
static std::vector<TaskPtr>
build_task(const JobPtr& job);
void
calculate_path(const TaskPtr& task);
public:
static void
calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task);
private:
bool running_ = false;

View File

@ -23,10 +23,11 @@
#include "Scheduler.h"
#include "Utils.h"
#include "optimizer/BuildIndexPass.h"
#include "optimizer/FaissFlatPass.h"
#include "optimizer/FaissIVFFlatPass.h"
#include "optimizer/FaissIVFSQ8HPass.h"
#include "optimizer/FaissIVFSQ8Pass.h"
#include "optimizer/FallbackPass.h"
#include "optimizer/HybridPass.h"
#include "optimizer/LargeSQ8HPass.h"
#include "optimizer/OnlyCPUPass.h"
#include "optimizer/Optimizer.h"
#include "server/Config.h"
@ -100,15 +101,12 @@ class OptimizerInst {
std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) {
std::vector<PassPtr> pass_list;
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
pass_list.push_back(std::make_shared<HybridPass>());
#ifdef MILVUS_CPU_VERSION
pass_list.push_back(std::make_shared<OnlyCPUPass>());
#else
server::Config& config = server::Config::GetInstance();
std::vector<int32_t> build_resources;
config.GetGpuResourceConfigBuildIndexResources(build_resources);
pass_list.push_back(std::make_shared<BuildIndexPass>(build_resources));
#ifdef MILVUS_GPU_VERSION
pass_list.push_back(std::make_shared<BuildIndexPass>());
pass_list.push_back(std::make_shared<FaissFlatPass>());
pass_list.push_back(std::make_shared<FaissIVFFlatPass>());
pass_list.push_back(std::make_shared<FaissIVFSQ8Pass>());
pass_list.push_back(std::make_shared<FaissIVFSQ8HPass>());
#endif
pass_list.push_back(std::make_shared<FallbackPass>());
instance = std::make_shared<Optimizer>(pass_list);

View File

@ -108,10 +108,6 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
switch (task_table_type) {
case TaskLabelType::DEFAULT: {
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
}
case TaskLabelType::SPECIFIED_RESOURCE: {
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;

View File

@ -18,7 +18,6 @@
#include "scheduler/TaskCreator.h"
#include "SchedInst.h"
#include "tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "tasklabel/SpecResLabel.h"
namespace milvus {
@ -47,8 +46,7 @@ std::vector<TaskPtr>
TaskCreator::Create(const SearchJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& index_file : job->index_files()) {
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<XSearchTask>(index_file.second, label);
auto task = std::make_shared<XSearchTask>(index_file.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
@ -70,11 +68,8 @@ TaskCreator::Create(const DeleteJobPtr& job) {
std::vector<TaskPtr>
TaskCreator::Create(const BuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
for (auto& to_index_file : job->to_index_files()) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label);
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}

View File

@ -16,7 +16,6 @@
// under the License.
#include "scheduler/Utils.h"
#include "server/Config.h"
#ifdef MILVUS_GPU_VERSION
#include <cuda_runtime.h>

View File

@ -36,10 +36,6 @@ class Action {
static void
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
static void
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
static void
SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);

View File

@ -101,38 +101,6 @@ Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest)
dest->task_table().Put(task_item->task, task_item);
}
void
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
bool moved = false;
// to support test task, REFACTOR
if (resource->type() == ResourceType::CPU) {
if (auto index_engine = search_task->index_engine_) {
auto location = index_engine->GetLocation();
for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) {
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_, dest_resource);
break;
}
}
}
}
if (not moved) {
PushTaskToNeighbourRandomly(task_item, resource);
}
}
}
void
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {

View File

@ -23,11 +23,14 @@
namespace milvus {
namespace scheduler {
BuildIndexPass::BuildIndexPass(std::vector<int32_t>& build_gpu_ids) : build_gpu_ids_(build_gpu_ids) {
}
void
BuildIndexPass::Init() {
server::Config& config = server::Config::GetInstance();
std::vector<int32_t> build_resources;
Status s = config.GetGpuResourceConfigBuildIndexResources(build_resources);
if (!s.ok()) {
throw;
}
}
bool

View File

@ -34,7 +34,7 @@ namespace scheduler {
class BuildIndexPass : public Pass {
public:
explicit BuildIndexPass(std::vector<int32_t>& build_gpu_id);
BuildIndexPass() = default;
public:
void

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/FaissFlatPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
FaissFlatPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
FaissFlatPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IDMAP) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
} // namespace scheduler
} // namespace milvus

View File

@ -33,9 +33,9 @@
namespace milvus {
namespace scheduler {
class LargeSQ8HPass : public Pass {
class FaissFlatPass : public Pass {
public:
LargeSQ8HPass() = default;
FaissFlatPass() = default;
public:
void
@ -50,7 +50,7 @@ class LargeSQ8HPass : public Pass {
std::vector<int32_t> gpus;
};
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
using FaissFlatPassPtr = std::shared_ptr<FaissFlatPass>;
} // namespace scheduler
} // namespace milvus

View File

@ -15,31 +15,52 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/OnlyCPUPass.h"
#include "scheduler/optimizer/FaissIVFFlatPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
OnlyCPUPass::Init() {
FaissIVFFlatPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
OnlyCPUPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask)
return false;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8 &&
search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT) {
FaissIVFFlatPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}

View File

@ -18,6 +18,7 @@
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
@ -32,9 +33,9 @@
namespace milvus {
namespace scheduler {
class HybridPass : public Pass {
class FaissIVFFlatPass : public Pass {
public:
HybridPass() = default;
FaissIVFFlatPass() = default;
public:
void
@ -42,9 +43,14 @@ class HybridPass : public Pass {
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t count_ = 0;
std::vector<int32_t> gpus;
};
using HybridPassPtr = std::shared_ptr<HybridPass>;
using FaissIVFFlatPassPtr = std::shared_ptr<FaissIVFFlatPass>;
} // namespace scheduler
} // namespace milvus

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/LargeSQ8HPass.h"
#include "scheduler/optimizer/FaissIVFSQ8HPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
@ -28,7 +28,7 @@ namespace milvus {
namespace scheduler {
void
LargeSQ8HPass::Init() {
FaissIVFSQ8HPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
@ -38,7 +38,7 @@ LargeSQ8HPass::Init() {
}
bool
LargeSQ8HPass::Run(const TaskPtr& task) {
FaissIVFSQ8HPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
@ -49,36 +49,16 @@ LargeSQ8HPass::Run(const TaskPtr& task) {
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
return false;
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
// std::vector<int64_t> all_free_mem;
// for (auto& gpu : gpus) {
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
// auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
// all_free_mem.push_back(free_mem);
// }
//
// auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
// auto best_index = std::distance(all_free_mem.begin(), max_e);
// auto best_device_id = gpus[best_index];
auto best_device_id = count_ % gpus.size();
count_++;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
if (not res_ptr) {
SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
// TODO: throw critical error and exit
return false;
}
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}

View File

@ -14,23 +14,43 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "TaskLabel.h"
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Pass.h"
namespace milvus {
namespace scheduler {
class DefaultLabel : public TaskLabel {
class FaissIVFSQ8HPass : public Pass {
public:
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {
}
FaissIVFSQ8HPass() = default;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t count_ = 0;
std::vector<int32_t> gpus;
};
using DefaultLabelPtr = std::shared_ptr<DefaultLabel>;
using FaissIVFSQ8HPassPtr = std::shared_ptr<FaissIVFSQ8HPass>;
} // namespace scheduler
} // namespace milvus

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/FaissIVFSQ8Pass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
FaissIVFSQ8Pass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
FaissIVFSQ8Pass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
} // namespace scheduler
} // namespace milvus

View File

@ -18,6 +18,7 @@
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
@ -32,9 +33,9 @@
namespace milvus {
namespace scheduler {
class OnlyCPUPass : public Pass {
class FaissIVFSQ8Pass : public Pass {
public:
OnlyCPUPass() = default;
FaissIVFSQ8Pass() = default;
public:
void
@ -42,9 +43,14 @@ class OnlyCPUPass : public Pass {
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t count_ = 0;
std::vector<int32_t> gpus;
};
using OnlyCPUPassPtr = std::shared_ptr<OnlyCPUPass>;
using FaissIVFSQ8PassPtr = std::shared_ptr<FaissIVFSQ8Pass>;
} // namespace scheduler
} // namespace milvus

View File

@ -1,47 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/HybridPass.h"
#include "scheduler/SchedInst.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
void
HybridPass::Init() {
}
bool
HybridPass::Run(const TaskPtr& task) {
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
if (task->Type() != TaskType::SearchTask)
return false;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) {
// TODO: remove "cpu" hardcode
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
return true;
}
return false;
}
} // namespace scheduler
} // namespace milvus

View File

@ -23,7 +23,6 @@ namespace milvus {
namespace scheduler {
enum class TaskLabelType {
DEFAULT, // means can be executed in any resource
SPECIFIED_RESOURCE, // means must executing in special resource
BROADCAST, // means all enable-executor resource must execute task
};

View File

@ -21,7 +21,6 @@ set(test_files
${CMAKE_CURRENT_SOURCE_DIR}/test_algorithm.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_event.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_node.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_normal.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_resource.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_resource_factory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_resource_mgr.cpp

View File

@ -1,72 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include "scheduler/ResourceFactory.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Scheduler.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "utils/Log.h"
namespace {
namespace ms = milvus::scheduler;
} // namespace
TEST(NormalTest, INST_TEST) {
// ResourceMgr only compose resources, provide unified event
auto res_mgr = ms::ResMgrInst::GetInstance();
res_mgr->Add(ms::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(ms::ResourceFactory::Create("cpu", "CPU", 0, true, true));
auto IO = ms::Connection("IO", 500.0);
res_mgr->Connect("disk", "cpu", IO);
auto scheduler = ms::SchedInst::GetInstance();
res_mgr->Start();
scheduler->Start();
const uint64_t NUM_TASK = 2;
std::vector<std::shared_ptr<ms::TestTask>> tasks;
ms::TableFileSchemaPtr dummy = nullptr;
auto disks = res_mgr->GetDiskResources();
ASSERT_FALSE(disks.empty());
if (auto observe = disks[0].lock()) {
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto label = std::make_shared<ms::DefaultLabel>();
auto task = std::make_shared<ms::TestTask>(dummy, label);
task->label() = std::make_shared<ms::DefaultLabel>();
tasks.push_back(task);
observe->task_table().Put(task);
}
}
for (auto& task : tasks) {
task->Wait();
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
scheduler->Stop();
res_mgr->Stop();
}

View File

@ -24,7 +24,7 @@
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
@ -182,8 +182,10 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(disk_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{disk_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
disk_resource_->task_table().Put(task);
}
@ -208,8 +210,10 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(cpu_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{cpu_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
cpu_resource_->task_table().Put(task);
}
@ -234,8 +238,10 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(gpu_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{gpu_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
gpu_resource_->task_table().Put(task);
}
@ -260,8 +266,10 @@ TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(test_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{test_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
test_resource_->task_table().Put(task);
}

View File

@ -22,7 +22,6 @@
#include "scheduler/resource/GpuResource.h"
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
namespace milvus {
namespace scheduler {
@ -187,8 +186,7 @@ TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {
auto callback = [&](EventPtr event) { flag = true; };
mgr1_->RegisterSubscriber(callback);
TableFileSchemaPtr dummy = nullptr;
auto label = std::make_shared<DefaultLabel>();
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, label));
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, nullptr));
sleep(1);
ASSERT_TRUE(flag);
}

View File

@ -23,7 +23,6 @@
#include "scheduler/Scheduler.h"
#include "scheduler/resource/Resource.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "utils/Error.h"
#include "wrapper/VecIndex.h"
@ -150,46 +149,6 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) {
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj);
}
TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>();
dummy->location_ = "location";
insert_dummy_index_into_gpu_cache(1);
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy, label);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
}
sleep(3);
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM);
}
TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy1 = std::make_shared<TableFileSchema>();
dummy1->location_ = "location";
tasks.clear();
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy1, label);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
}
sleep(3);
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
class SchedulerTest2 : public testing::Test {
protected:
void

View File

@ -18,7 +18,6 @@
#include <gtest/gtest.h>
#include "scheduler/TaskTable.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
/************ TaskTableBaseTest ************/
@ -162,9 +161,8 @@ class TaskTableBaseTest : public ::testing::Test {
SetUp() override {
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
invalid_task_ = nullptr;
auto label = std::make_shared<milvus::scheduler::DefaultLabel>();
task1_ = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
task2_ = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
task1_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
task2_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
}
milvus::scheduler::TaskPtr invalid_task_;
@ -320,8 +318,7 @@ class TaskTableAdvanceTest : public ::testing::Test {
SetUp() override {
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto label = std::make_shared<milvus::scheduler::DefaultLabel>();
auto task = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
auto task = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
table1_.Put(task);
}