mirror of https://github.com/milvus-io/milvus.git
Merge branch 'branch-0.4.0' into 'branch-0.4.0'
MS-528 Hide some config used future See merge request megasearch/milvus!518 Former-commit-id: 5196fe024eacdea6aad9cef2a1f3e0d1f508b16bpull/191/head
commit
88a625e629
|
@ -104,6 +104,9 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-519 - Add event_test in scheduler
|
||||
- MS-520 - Update resource_test in scheduler
|
||||
- MS-524 - Add some unittest in event_test and resource_test
|
||||
- MS-525 - Disable parallel reduce in SearchTask
|
||||
- MS-527 - Update scheduler_test and enable it
|
||||
- MS-528 - Hide some config used future
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
|
|
@ -48,51 +48,38 @@ resource_config:
|
|||
# example:
|
||||
# resource_name: # resource name, just using in connections below
|
||||
# type: DISK # resource type, optional: DISK/CPU/GPU
|
||||
# memory: 256 # memory size, unit: GB
|
||||
# device_id: 0
|
||||
# enable_loader: true # if is enable loader, optional: true, false
|
||||
# enable_executor: false # if is enable executor, optional: true, false
|
||||
|
||||
resources:
|
||||
ssda:
|
||||
type: DISK
|
||||
memory: 2048
|
||||
device_id: 0
|
||||
enable_loader: true
|
||||
enable_executor: false
|
||||
|
||||
cpu:
|
||||
type: CPU
|
||||
memory: 64
|
||||
device_id: 0
|
||||
enable_loader: true
|
||||
enable_executor: false
|
||||
|
||||
gpu0:
|
||||
type: GPU
|
||||
memory: 6
|
||||
device_id: 0
|
||||
enable_loader: true
|
||||
enable_executor: true
|
||||
gpu_resource_num: 2
|
||||
pinned_memory: 300
|
||||
temp_memory: 300
|
||||
|
||||
# gtx1660:
|
||||
# type: GPU
|
||||
# memory: 6
|
||||
# device_id: 1
|
||||
# enable_loader: true
|
||||
# enable_executor: true
|
||||
|
||||
# connection list, length: 0~N
|
||||
# format: -${resource_name}===${resource_name}
|
||||
# example:
|
||||
# connection_name:
|
||||
# speed: 100 # unit: MS/s
|
||||
# endpoint: ${resource_name}===${resource_name}
|
||||
connections:
|
||||
io:
|
||||
speed: 500
|
||||
endpoint: ssda===cpu
|
||||
pcie:
|
||||
pcie0:
|
||||
speed: 11000
|
||||
endpoint: cpu===gpu0
|
||||
# - cpu===gtx1660
|
||||
|
||||
|
|
|
@ -36,7 +36,8 @@ StartSchedulerService() {
|
|||
auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
|
||||
// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
|
||||
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
|
||||
auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
|
||||
// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
|
||||
auto enable_loader = true;
|
||||
auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
|
||||
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
|
||||
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
|
||||
|
|
|
@ -20,47 +20,47 @@ namespace engine {
|
|||
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000;
|
||||
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
|
||||
|
||||
bool
|
||||
NeedParallelReduce(uint64_t nq, uint64_t topk) {
|
||||
server::ServerConfig &config = server::ServerConfig::GetInstance();
|
||||
server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
|
||||
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
|
||||
if (!need_parallel) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
|
||||
}
|
||||
|
||||
void
|
||||
ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
|
||||
size_t reduce_batch = PARALLEL_REDUCE_BATCH;
|
||||
|
||||
auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
|
||||
if (thread_count > 0) {
|
||||
reduce_batch = max_index / thread_count + 1;
|
||||
}
|
||||
ENGINE_LOG_DEBUG << "use " << thread_count <<
|
||||
" thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
|
||||
|
||||
std::vector<std::shared_ptr<std::thread> > thread_array;
|
||||
size_t from_index = 0;
|
||||
while (from_index < max_index) {
|
||||
size_t to_index = from_index + reduce_batch;
|
||||
if (to_index > max_index) {
|
||||
to_index = max_index;
|
||||
}
|
||||
|
||||
auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
|
||||
thread_array.push_back(reduce_thread);
|
||||
|
||||
from_index = to_index;
|
||||
}
|
||||
|
||||
for (auto &thread_ptr : thread_array) {
|
||||
thread_ptr->join();
|
||||
}
|
||||
}
|
||||
//bool
|
||||
//NeedParallelReduce(uint64_t nq, uint64_t topk) {
|
||||
// server::ServerConfig &config = server::ServerConfig::GetInstance();
|
||||
// server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
|
||||
// bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
|
||||
// if (!need_parallel) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
|
||||
//}
|
||||
//
|
||||
//void
|
||||
//ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
|
||||
// size_t reduce_batch = PARALLEL_REDUCE_BATCH;
|
||||
//
|
||||
// auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
|
||||
// if (thread_count > 0) {
|
||||
// reduce_batch = max_index / thread_count + 1;
|
||||
// }
|
||||
// ENGINE_LOG_DEBUG << "use " << thread_count <<
|
||||
// " thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
|
||||
//
|
||||
// std::vector<std::shared_ptr<std::thread> > thread_array;
|
||||
// size_t from_index = 0;
|
||||
// while (from_index < max_index) {
|
||||
// size_t to_index = from_index + reduce_batch;
|
||||
// if (to_index > max_index) {
|
||||
// to_index = max_index;
|
||||
// }
|
||||
//
|
||||
// auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
|
||||
// thread_array.push_back(reduce_thread);
|
||||
//
|
||||
// from_index = to_index;
|
||||
// }
|
||||
//
|
||||
// for (auto &thread_ptr : thread_array) {
|
||||
// thread_ptr->join();
|
||||
// }
|
||||
//}
|
||||
|
||||
void
|
||||
CollectFileMetrics(int file_type, size_t file_size) {
|
||||
|
@ -238,11 +238,11 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
|
|||
}
|
||||
};
|
||||
|
||||
if (NeedParallelReduce(nq, topk)) {
|
||||
ParallelReduce(reduce_worker, nq);
|
||||
} else {
|
||||
// if (NeedParallelReduce(nq, topk)) {
|
||||
// ParallelReduce(reduce_worker, nq);
|
||||
// } else {
|
||||
reduce_worker(0, nq);
|
||||
}
|
||||
// }
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -343,11 +343,11 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
|
|||
}
|
||||
};
|
||||
|
||||
if (NeedParallelReduce(result_src.size(), topk)) {
|
||||
ParallelReduce(ReduceWorker, result_src.size());
|
||||
} else {
|
||||
// if (NeedParallelReduce(result_src.size(), topk)) {
|
||||
// ParallelReduce(ReduceWorker, result_src.size());
|
||||
// } else {
|
||||
ReduceWorker(0, result_src.size());
|
||||
}
|
||||
// }
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#include "scheduler/Scheduler.h"
|
||||
#include <gtest/gtest.h>
|
||||
#include <src/scheduler/tasklabel/DefaultLabel.h>
|
||||
#include <src/server/ServerConfig.h>
|
||||
#include "cache/DataObj.h"
|
||||
#include "cache/GpuCacheMgr.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
|
@ -15,233 +16,238 @@
|
|||
#include "wrapper/knowhere/vec_index.h"
|
||||
#include "scheduler/tasklabel/SpecResLabel.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
//class MockVecIndex : public engine::VecIndex {
|
||||
//public:
|
||||
// virtual server::KnowhereError BuildAll(const long &nb,
|
||||
// const float *xb,
|
||||
// const long *ids,
|
||||
// const engine::Config &cfg,
|
||||
// const long &nt = 0,
|
||||
// const float *xt = nullptr) {
|
||||
//
|
||||
// }
|
||||
//
|
||||
// engine::VecIndexPtr Clone() override {
|
||||
// return zilliz::milvus::engine::VecIndexPtr();
|
||||
// }
|
||||
//
|
||||
// int64_t GetDeviceId() override {
|
||||
// return 0;
|
||||
// }
|
||||
//
|
||||
// engine::IndexType GetType() override {
|
||||
// return engine::IndexType::INVALID;
|
||||
// }
|
||||
//
|
||||
// virtual server::KnowhereError Add(const long &nb,
|
||||
// const float *xb,
|
||||
// const long *ids,
|
||||
// const engine::Config &cfg = engine::Config()) {
|
||||
//
|
||||
// }
|
||||
//
|
||||
// virtual server::KnowhereError Search(const long &nq,
|
||||
// const float *xq,
|
||||
// float *dist,
|
||||
// long *ids,
|
||||
// const engine::Config &cfg = engine::Config()) {
|
||||
//
|
||||
// }
|
||||
//
|
||||
// engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
|
||||
//
|
||||
// }
|
||||
//
|
||||
// engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
|
||||
//
|
||||
// }
|
||||
//
|
||||
// virtual int64_t Dimension() {
|
||||
// return dimension_;
|
||||
// }
|
||||
//
|
||||
// virtual int64_t Count() {
|
||||
// return ntotal_;
|
||||
// }
|
||||
//
|
||||
// virtual zilliz::knowhere::BinarySet Serialize() {
|
||||
// zilliz::knowhere::BinarySet binset;
|
||||
// return binset;
|
||||
// }
|
||||
//
|
||||
// virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) {
|
||||
//
|
||||
// }
|
||||
//
|
||||
//public:
|
||||
// int64_t dimension_ = 512;
|
||||
// int64_t ntotal_ = 0;
|
||||
//};
|
||||
//
|
||||
//
|
||||
//class SchedulerTest : public testing::Test {
|
||||
//protected:
|
||||
// void
|
||||
// SetUp() override {
|
||||
// ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
|
||||
// ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
|
||||
// ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
|
||||
//
|
||||
// res_mgr_ = std::make_shared<ResourceMgr>();
|
||||
// cpu_resource_ = res_mgr_->Add(std::move(cpu));
|
||||
// gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
|
||||
// gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
|
||||
//
|
||||
// auto PCIE = Connection("IO", 11000.0);
|
||||
// res_mgr_->Connect("cpu", "gpu0", PCIE);
|
||||
// res_mgr_->Connect("cpu", "gpu1", PCIE);
|
||||
//
|
||||
// scheduler_ = std::make_shared<Scheduler>(res_mgr_);
|
||||
//
|
||||
// res_mgr_->Start();
|
||||
// scheduler_->Start();
|
||||
// }
|
||||
//
|
||||
// void
|
||||
// TearDown() override {
|
||||
// scheduler_->Stop();
|
||||
// res_mgr_->Stop();
|
||||
// }
|
||||
//
|
||||
// ResourceWPtr cpu_resource_;
|
||||
// ResourceWPtr gpu_resource_0_;
|
||||
// ResourceWPtr gpu_resource_1_;
|
||||
//
|
||||
// ResourceMgrPtr res_mgr_;
|
||||
// std::shared_ptr<Scheduler> scheduler_;
|
||||
//};
|
||||
//
|
||||
//void
|
||||
//insert_dummy_index_into_gpu_cache(uint64_t device_id) {
|
||||
// MockVecIndex* mock_index = new MockVecIndex();
|
||||
// mock_index->ntotal_ = 1000;
|
||||
// engine::VecIndexPtr index(mock_index);
|
||||
//
|
||||
// cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
|
||||
//
|
||||
// cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj);
|
||||
//}
|
||||
//
|
||||
//TEST_F(SchedulerTest, OnCopyCompleted) {
|
||||
// const uint64_t NUM = 10;
|
||||
// std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
// TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
|
||||
// dummy->location_ = "location";
|
||||
//
|
||||
// insert_dummy_index_into_gpu_cache(1);
|
||||
//
|
||||
// for (uint64_t i = 0; i < NUM; ++i) {
|
||||
// auto task = std::make_shared<TestTask>(dummy);
|
||||
// task->label() = std::make_shared<DefaultLabel>();
|
||||
// tasks.push_back(task);
|
||||
// cpu_resource_.lock()->task_table().Put(task);
|
||||
// }
|
||||
//
|
||||
// sleep(3);
|
||||
class MockVecIndex : public engine::VecIndex {
|
||||
public:
|
||||
virtual ErrorCode BuildAll(const long &nb,
|
||||
const float *xb,
|
||||
const long *ids,
|
||||
const engine::Config &cfg,
|
||||
const long &nt = 0,
|
||||
const float *xt = nullptr) {
|
||||
|
||||
}
|
||||
|
||||
engine::VecIndexPtr Clone() override {
|
||||
return zilliz::milvus::engine::VecIndexPtr();
|
||||
}
|
||||
|
||||
int64_t GetDeviceId() override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
engine::IndexType GetType() override {
|
||||
return engine::IndexType::INVALID;
|
||||
}
|
||||
|
||||
virtual ErrorCode Add(const long &nb,
|
||||
const float *xb,
|
||||
const long *ids,
|
||||
const engine::Config &cfg = engine::Config()) {
|
||||
|
||||
}
|
||||
|
||||
virtual ErrorCode Search(const long &nq,
|
||||
const float *xq,
|
||||
float *dist,
|
||||
long *ids,
|
||||
const engine::Config &cfg = engine::Config()) {
|
||||
|
||||
}
|
||||
|
||||
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
|
||||
|
||||
}
|
||||
|
||||
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
|
||||
|
||||
}
|
||||
|
||||
virtual int64_t Dimension() {
|
||||
return dimension_;
|
||||
}
|
||||
|
||||
virtual int64_t Count() {
|
||||
return ntotal_;
|
||||
}
|
||||
|
||||
virtual zilliz::knowhere::BinarySet Serialize() {
|
||||
zilliz::knowhere::BinarySet binset;
|
||||
return binset;
|
||||
}
|
||||
|
||||
virtual ErrorCode Load(const zilliz::knowhere::BinarySet &index_binary) {
|
||||
|
||||
}
|
||||
|
||||
public:
|
||||
int64_t dimension_ = 512;
|
||||
int64_t ntotal_ = 0;
|
||||
};
|
||||
|
||||
|
||||
class SchedulerTest : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
|
||||
config.AddSequenceItem(server::CONFIG_GPU_IDS, "0");
|
||||
config.AddSequenceItem(server::CONFIG_GPU_IDS, "1");
|
||||
|
||||
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
|
||||
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
|
||||
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
|
||||
|
||||
res_mgr_ = std::make_shared<ResourceMgr>();
|
||||
cpu_resource_ = res_mgr_->Add(std::move(cpu));
|
||||
gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
|
||||
gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
|
||||
|
||||
auto PCIE = Connection("IO", 11000.0);
|
||||
res_mgr_->Connect("cpu", "gpu0", PCIE);
|
||||
res_mgr_->Connect("cpu", "gpu1", PCIE);
|
||||
|
||||
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
|
||||
|
||||
res_mgr_->Start();
|
||||
scheduler_->Start();
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
scheduler_->Stop();
|
||||
res_mgr_->Stop();
|
||||
}
|
||||
|
||||
ResourceWPtr cpu_resource_;
|
||||
ResourceWPtr gpu_resource_0_;
|
||||
ResourceWPtr gpu_resource_1_;
|
||||
|
||||
ResourceMgrPtr res_mgr_;
|
||||
std::shared_ptr<Scheduler> scheduler_;
|
||||
};
|
||||
|
||||
void
|
||||
insert_dummy_index_into_gpu_cache(uint64_t device_id) {
|
||||
MockVecIndex *mock_index = new MockVecIndex();
|
||||
mock_index->ntotal_ = 1000;
|
||||
engine::VecIndexPtr index(mock_index);
|
||||
|
||||
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
|
||||
|
||||
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj);
|
||||
}
|
||||
|
||||
TEST_F(SchedulerTest, OnLoadCompleted) {
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
|
||||
dummy->location_ = "location";
|
||||
|
||||
insert_dummy_index_into_gpu_cache(1);
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto task = std::make_shared<TestTask>(dummy);
|
||||
task->label() = std::make_shared<DefaultLabel>();
|
||||
tasks.push_back(task);
|
||||
cpu_resource_.lock()->task_table().Put(task);
|
||||
}
|
||||
|
||||
sleep(3);
|
||||
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
|
||||
}
|
||||
|
||||
TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) {
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy1 = std::make_shared<meta::TableFileSchema>();
|
||||
dummy1->location_ = "location";
|
||||
|
||||
tasks.clear();
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto task = std::make_shared<TestTask>(dummy1);
|
||||
task->label() = std::make_shared<DefaultLabel>();
|
||||
tasks.push_back(task);
|
||||
cpu_resource_.lock()->task_table().Put(task);
|
||||
}
|
||||
|
||||
sleep(3);
|
||||
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
//
|
||||
//}
|
||||
//
|
||||
//TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) {
|
||||
// const uint64_t NUM = 10;
|
||||
// std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
// TableFileSchemaPtr dummy1 = std::make_shared<meta::TableFileSchema>();
|
||||
// dummy1->location_ = "location";
|
||||
//
|
||||
// tasks.clear();
|
||||
//
|
||||
// for (uint64_t i = 0; i < NUM; ++i) {
|
||||
// auto task = std::make_shared<TestTask>(dummy1);
|
||||
// task->label() = std::make_shared<DefaultLabel>();
|
||||
// tasks.push_back(task);
|
||||
// cpu_resource_.lock()->task_table().Put(task);
|
||||
// }
|
||||
//
|
||||
// sleep(3);
|
||||
//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
//}
|
||||
//
|
||||
//class SchedulerTest2 : public testing::Test {
|
||||
// protected:
|
||||
// void
|
||||
// SetUp() override {
|
||||
// ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
|
||||
// ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false);
|
||||
// ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false);
|
||||
// ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false);
|
||||
// ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true);
|
||||
// ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true);
|
||||
//
|
||||
// res_mgr_ = std::make_shared<ResourceMgr>();
|
||||
// disk_ = res_mgr_->Add(std::move(disk));
|
||||
// cpu_0_ = res_mgr_->Add(std::move(cpu0));
|
||||
// cpu_1_ = res_mgr_->Add(std::move(cpu1));
|
||||
// cpu_2_ = res_mgr_->Add(std::move(cpu2));
|
||||
// gpu_0_ = res_mgr_->Add(std::move(gpu0));
|
||||
// gpu_1_ = res_mgr_->Add(std::move(gpu1));
|
||||
// auto IO = Connection("IO", 5.0);
|
||||
// auto PCIE1 = Connection("PCIE", 11.0);
|
||||
// auto PCIE2 = Connection("PCIE", 20.0);
|
||||
// res_mgr_->Connect("disk", "cpu0", IO);
|
||||
// res_mgr_->Connect("cpu0", "cpu1", IO);
|
||||
// res_mgr_->Connect("cpu1", "cpu2", IO);
|
||||
// res_mgr_->Connect("cpu0", "cpu2", IO);
|
||||
// res_mgr_->Connect("cpu1", "gpu0", PCIE1);
|
||||
// res_mgr_->Connect("cpu2", "gpu1", PCIE2);
|
||||
//
|
||||
// scheduler_ = std::make_shared<Scheduler>(res_mgr_);
|
||||
//
|
||||
// res_mgr_->Start();
|
||||
// scheduler_->Start();
|
||||
// }
|
||||
//
|
||||
// void
|
||||
// TearDown() override {
|
||||
// scheduler_->Stop();
|
||||
// res_mgr_->Stop();
|
||||
// }
|
||||
//
|
||||
// ResourceWPtr disk_;
|
||||
// ResourceWPtr cpu_0_;
|
||||
// ResourceWPtr cpu_1_;
|
||||
// ResourceWPtr cpu_2_;
|
||||
// ResourceWPtr gpu_0_;
|
||||
// ResourceWPtr gpu_1_;
|
||||
// ResourceMgrPtr res_mgr_;
|
||||
//
|
||||
// std::shared_ptr<Scheduler> scheduler_;
|
||||
//};
|
||||
//
|
||||
//
|
||||
//TEST_F(SchedulerTest2, SpecifiedResourceTest) {
|
||||
// const uint64_t NUM = 10;
|
||||
// std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
// TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
|
||||
// dummy->location_ = "location";
|
||||
//
|
||||
// for (uint64_t i = 0; i < NUM; ++i) {
|
||||
// std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
|
||||
// task->label() = std::make_shared<SpecResLabel>(disk_);
|
||||
// tasks.push_back(task);
|
||||
// disk_.lock()->task_table().Put(task);
|
||||
// }
|
||||
//
|
||||
//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
//}
|
||||
}
|
||||
|
||||
class SchedulerTest2 : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
|
||||
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false);
|
||||
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false);
|
||||
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false);
|
||||
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true);
|
||||
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true);
|
||||
|
||||
res_mgr_ = std::make_shared<ResourceMgr>();
|
||||
disk_ = res_mgr_->Add(std::move(disk));
|
||||
cpu_0_ = res_mgr_->Add(std::move(cpu0));
|
||||
cpu_1_ = res_mgr_->Add(std::move(cpu1));
|
||||
cpu_2_ = res_mgr_->Add(std::move(cpu2));
|
||||
gpu_0_ = res_mgr_->Add(std::move(gpu0));
|
||||
gpu_1_ = res_mgr_->Add(std::move(gpu1));
|
||||
auto IO = Connection("IO", 5.0);
|
||||
auto PCIE1 = Connection("PCIE", 11.0);
|
||||
auto PCIE2 = Connection("PCIE", 20.0);
|
||||
res_mgr_->Connect("disk", "cpu0", IO);
|
||||
res_mgr_->Connect("cpu0", "cpu1", IO);
|
||||
res_mgr_->Connect("cpu1", "cpu2", IO);
|
||||
res_mgr_->Connect("cpu0", "cpu2", IO);
|
||||
res_mgr_->Connect("cpu1", "gpu0", PCIE1);
|
||||
res_mgr_->Connect("cpu2", "gpu1", PCIE2);
|
||||
|
||||
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
|
||||
|
||||
res_mgr_->Start();
|
||||
scheduler_->Start();
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
scheduler_->Stop();
|
||||
res_mgr_->Stop();
|
||||
}
|
||||
|
||||
ResourceWPtr disk_;
|
||||
ResourceWPtr cpu_0_;
|
||||
ResourceWPtr cpu_1_;
|
||||
ResourceWPtr cpu_2_;
|
||||
ResourceWPtr gpu_0_;
|
||||
ResourceWPtr gpu_1_;
|
||||
ResourceMgrPtr res_mgr_;
|
||||
|
||||
std::shared_ptr<Scheduler> scheduler_;
|
||||
};
|
||||
|
||||
|
||||
TEST_F(SchedulerTest2, SpecifiedResourceTest) {
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
|
||||
dummy->location_ = "location";
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
|
||||
task->label() = std::make_shared<SpecResLabel>(disk_);
|
||||
tasks.push_back(task);
|
||||
disk_.lock()->task_table().Put(task);
|
||||
}
|
||||
|
||||
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue