mirror of https://github.com/milvus-io/milvus.git
Merge pull request #119 from scsven/dev
Using shared_ptr instead of weak_ptr to avoid performance loss Former-commit-id: 1e0c9f5a1ce757f3768c5efe44b383e93f677bbepull/191/head
commit
2f762900e0
|
@ -13,6 +13,9 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- \#82 - Move easyloggingpp into "external" directory
|
||||
- \#92 - Speed up CMake build process
|
||||
- \#96 - Remove .a file in milvus/lib for docker-version
|
||||
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
|
||||
- \#122 - Add unique id for Job
|
||||
|
||||
## Feature
|
||||
- \#115 - Using new structure for tasktable
|
||||
|
||||
|
|
|
@ -136,7 +136,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
|
|||
|
||||
// scheduler will determine when to delete table files
|
||||
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
|
||||
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
|
||||
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
|
||||
scheduler::JobMgrInst::GetInstance()->Put(job);
|
||||
job->WaitAndDelete();
|
||||
} else {
|
||||
|
@ -439,7 +439,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
|
|||
|
||||
// step 1: get files to search
|
||||
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
|
||||
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
|
||||
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
|
||||
for (auto& file : files) {
|
||||
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
job->AddIndexFile(file_ptr);
|
||||
|
@ -754,7 +754,7 @@ DBImpl::BackgroundBuildIndex() {
|
|||
Status status;
|
||||
|
||||
if (!to_index_files.empty()) {
|
||||
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
|
||||
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
|
||||
|
||||
// step 2: put build index task to scheduler
|
||||
for (auto& file : to_index_files) {
|
||||
|
|
|
@ -54,7 +54,7 @@ ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrP
|
|||
auto cur_neighbours = cur_node->GetNeighbours();
|
||||
|
||||
for (auto& neighbour : cur_neighbours) {
|
||||
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock());
|
||||
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node);
|
||||
dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] =
|
||||
neighbour.connection.transport_cost();
|
||||
}
|
||||
|
|
|
@ -75,7 +75,6 @@ class ResourceMgr : public interface::dumpable {
|
|||
return gpu_resources_;
|
||||
}
|
||||
|
||||
// TODO(wxyu): why return shared pointer
|
||||
inline std::vector<ResourcePtr>
|
||||
GetAllResources() {
|
||||
return resources_;
|
||||
|
|
|
@ -82,79 +82,9 @@ load_simple_config() {
|
|||
}
|
||||
}
|
||||
|
||||
void
|
||||
load_advance_config() {
|
||||
// try {
|
||||
// server::ConfigNode &config = server::Config::GetInstance().GetConfig(server::CONFIG_RESOURCE);
|
||||
//
|
||||
// if (config.GetChildren().empty()) throw "resource_config null exception";
|
||||
//
|
||||
// auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
|
||||
//
|
||||
// if (resources.empty()) throw "Children of resource_config null exception";
|
||||
//
|
||||
// for (auto &resource : resources) {
|
||||
// auto &resname = resource.first;
|
||||
// auto &resconf = resource.second;
|
||||
// auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
|
||||
//// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
|
||||
// auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
|
||||
//// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
|
||||
// auto enable_loader = true;
|
||||
// auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
|
||||
// auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
|
||||
// auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
|
||||
// auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM);
|
||||
//
|
||||
// auto res = ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
|
||||
// type,
|
||||
// device_id,
|
||||
// enable_loader,
|
||||
// enable_executor));
|
||||
//
|
||||
// if (res.lock()->type() == ResourceType::GPU) {
|
||||
// auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300);
|
||||
// auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300);
|
||||
// auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2);
|
||||
// pinned_memory = 1024 * 1024 * pinned_memory;
|
||||
// temp_memory = 1024 * 1024 * temp_memory;
|
||||
// knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id,
|
||||
// pinned_memory,
|
||||
// temp_memory,
|
||||
// resource_num);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
|
||||
//
|
||||
// auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
|
||||
// if (connections.empty()) throw "connections config null exception";
|
||||
// for (auto &conn : connections) {
|
||||
// auto &connect_name = conn.first;
|
||||
// auto &connect_conf = conn.second;
|
||||
// auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
|
||||
// auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
|
||||
//
|
||||
// std::string delimiter = "===";
|
||||
// std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
|
||||
// std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
|
||||
// connect_endpoint.length());
|
||||
//
|
||||
// auto connection = Connection(connect_name, connect_speed);
|
||||
// ResMgrInst::GetInstance()->Connect(left, right, connection);
|
||||
// }
|
||||
// } catch (const char *msg) {
|
||||
// SERVER_LOG_ERROR << msg;
|
||||
// // TODO(wxyu): throw exception instead
|
||||
// exit(-1);
|
||||
//// throw std::exception();
|
||||
// }
|
||||
}
|
||||
|
||||
void
|
||||
StartSchedulerService() {
|
||||
load_simple_config();
|
||||
// load_advance_config();
|
||||
ResMgrInst::GetInstance()->Start();
|
||||
SchedInst::GetInstance()->Start();
|
||||
JobMgrInst::GetInstance()->Start();
|
||||
|
|
|
@ -26,10 +26,8 @@
|
|||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) {
|
||||
if (auto mgr = res_mgr_.lock()) {
|
||||
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
|
||||
}
|
||||
Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) {
|
||||
res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::START_UP),
|
||||
std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1)));
|
||||
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::LOAD_COMPLETED),
|
||||
|
@ -40,6 +38,10 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::m
|
|||
std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1)));
|
||||
}
|
||||
|
||||
Scheduler::~Scheduler() {
|
||||
res_mgr_ = nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::Start() {
|
||||
running_ = true;
|
||||
|
@ -100,51 +102,45 @@ Scheduler::Process(const EventPtr& event) {
|
|||
void
|
||||
Scheduler::OnLoadCompleted(const EventPtr& event) {
|
||||
auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event);
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupExecutor();
|
||||
|
||||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
if (resource->HasExecutor() == false) {
|
||||
load_completed_event->task_table_item_->Move();
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
break;
|
||||
}
|
||||
default: { break; }
|
||||
auto resource = event->resource_;
|
||||
resource->WakeupExecutor();
|
||||
|
||||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
resource->WakeupLoader();
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
if (resource->HasExecutor() == false) {
|
||||
load_completed_event->task_table_item_->Move();
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
break;
|
||||
}
|
||||
default: { break; }
|
||||
}
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnStartUp(const EventPtr& event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
event->resource_->WakeupLoader();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnFinishTask(const EventPtr& event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
event->resource_->WakeupLoader();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnTaskTableUpdated(const EventPtr& event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
event->resource_->WakeupLoader();
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
|
|
|
@ -34,7 +34,9 @@ namespace scheduler {
|
|||
|
||||
class Scheduler : public interface::dumpable {
|
||||
public:
|
||||
explicit Scheduler(ResourceMgrWPtr res_mgr);
|
||||
explicit Scheduler(ResourceMgrPtr res_mgr);
|
||||
|
||||
~Scheduler();
|
||||
|
||||
Scheduler(const Scheduler&) = delete;
|
||||
Scheduler(Scheduler&&) = delete;
|
||||
|
@ -118,7 +120,7 @@ class Scheduler : public interface::dumpable {
|
|||
|
||||
std::unordered_map<uint64_t, std::function<void(EventPtr)>> event_register_;
|
||||
|
||||
ResourceMgrWPtr res_mgr_;
|
||||
ResourceMgrPtr res_mgr_;
|
||||
std::queue<EventPtr> event_queue_;
|
||||
std::thread worker_thread_;
|
||||
std::mutex event_mutex_;
|
||||
|
|
|
@ -291,11 +291,6 @@ TaskTable::Put(std::vector<TaskPtr>& tasks) {
|
|||
}
|
||||
}
|
||||
|
||||
TaskTableItemPtr
|
||||
TaskTable::Get(uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
size_t
|
||||
TaskTable::TaskToExecute() {
|
||||
size_t count = 0;
|
||||
|
|
|
@ -106,6 +106,11 @@ class TaskTable : public interface::dumpable {
|
|||
TaskTable(const TaskTable&) = delete;
|
||||
TaskTable(TaskTable&&) = delete;
|
||||
|
||||
public:
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
inline void
|
||||
RegisterSubscriber(std::function<void(void)> subscriber) {
|
||||
subscriber_ = std::move(subscriber);
|
||||
|
@ -124,40 +129,35 @@ class TaskTable : public interface::dumpable {
|
|||
void
|
||||
Put(std::vector<TaskPtr>& tasks);
|
||||
|
||||
/*
|
||||
* Return task table item reference;
|
||||
*/
|
||||
TaskTableItemPtr
|
||||
Get(uint64_t index);
|
||||
|
||||
inline size_t
|
||||
Capacity() {
|
||||
return table_.capacity();
|
||||
}
|
||||
|
||||
/*
|
||||
* Return size of task table;
|
||||
*/
|
||||
inline size_t
|
||||
Size() {
|
||||
return table_.size();
|
||||
}
|
||||
|
||||
size_t
|
||||
TaskToExecute();
|
||||
|
||||
public:
|
||||
const TaskTableItemPtr& operator[](uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
public:
|
||||
std::vector<uint64_t>
|
||||
PickToLoad(uint64_t limit);
|
||||
|
||||
std::vector<uint64_t>
|
||||
PickToExecute(uint64_t limit);
|
||||
|
||||
public:
|
||||
inline const TaskTableItemPtr& operator[](uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
inline const TaskTableItemPtr&
|
||||
at(uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
inline size_t
|
||||
capacity() {
|
||||
return table_.capacity();
|
||||
}
|
||||
|
||||
inline size_t
|
||||
size() {
|
||||
return table_.size();
|
||||
}
|
||||
|
||||
public:
|
||||
/******** Action ********/
|
||||
|
||||
|
@ -223,13 +223,6 @@ class TaskTable : public interface::dumpable {
|
|||
return table_[index]->Moved();
|
||||
}
|
||||
|
||||
public:
|
||||
/*
|
||||
* Dump;
|
||||
*/
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
private:
|
||||
std::uint64_t id_ = 0;
|
||||
CircleQueue<TaskTableItemPtr> table_;
|
||||
|
|
|
@ -37,10 +37,11 @@ class Action {
|
|||
PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest);
|
||||
|
||||
static void
|
||||
DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event);
|
||||
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event);
|
||||
|
||||
static void
|
||||
SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
||||
SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event);
|
||||
};
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ std::vector<ResourcePtr>
|
|||
get_neighbours(const ResourcePtr& self) {
|
||||
std::vector<ResourcePtr> neighbours;
|
||||
for (auto& neighbour_node : self->GetNeighbours()) {
|
||||
auto node = neighbour_node.neighbour_node.lock();
|
||||
auto node = neighbour_node.neighbour_node;
|
||||
if (not node)
|
||||
continue;
|
||||
|
||||
|
@ -46,7 +46,7 @@ std::vector<std::pair<ResourcePtr, Connection>>
|
|||
get_neighbours_with_connetion(const ResourcePtr& self) {
|
||||
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
|
||||
for (auto& neighbour_node : self->GetNeighbours()) {
|
||||
auto node = neighbour_node.neighbour_node.lock();
|
||||
auto node = neighbour_node.neighbour_node;
|
||||
if (not node)
|
||||
continue;
|
||||
|
||||
|
@ -102,7 +102,7 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
|
|||
}
|
||||
|
||||
void
|
||||
Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
||||
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
|
||||
auto task = event->task_table_item_->task;
|
||||
|
@ -114,11 +114,11 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
|||
if (auto index_engine = search_task->index_engine_) {
|
||||
auto location = index_engine->GetLocation();
|
||||
|
||||
for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
|
||||
for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) {
|
||||
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
|
||||
if (index != nullptr) {
|
||||
moved = true;
|
||||
auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
|
||||
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
|
||||
PushTaskToResource(event->task_table_item_->task, dest_resource);
|
||||
break;
|
||||
}
|
||||
|
@ -133,17 +133,17 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
|||
}
|
||||
|
||||
void
|
||||
Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
|
||||
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
auto task = event->task_table_item_->task;
|
||||
if (resource->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
auto compute_resources = res_mgr.lock()->GetComputeResources();
|
||||
auto compute_resources = res_mgr->GetComputeResources();
|
||||
std::vector<std::vector<std::string>> paths;
|
||||
std::vector<uint64_t> transport_costs;
|
||||
for (auto& res : compute_resources) {
|
||||
std::vector<std::string> path;
|
||||
uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
|
||||
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
|
@ -187,10 +187,10 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
|||
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
|
||||
|
||||
bool find_gpu_res = false;
|
||||
if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
|
||||
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->name() ==
|
||||
res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
|
||||
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
|
||||
find_gpu_res = true;
|
||||
Path task_path(paths[i], paths[i].size() - 1);
|
||||
task->path() = task_path;
|
||||
|
@ -208,7 +208,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
|||
resource->WakeupExecutor();
|
||||
} else {
|
||||
auto next_res_name = task->path().Next();
|
||||
auto next_res = res_mgr.lock()->GetResource(next_res_name);
|
||||
auto next_res = res_mgr->GetResource(next_res_name);
|
||||
// if (event->task_table_item_->Move()) {
|
||||
// next_res->task_table().Put(task);
|
||||
// }
|
||||
|
|
|
@ -30,7 +30,7 @@ class Resource;
|
|||
|
||||
class Event {
|
||||
public:
|
||||
explicit Event(EventType type, std::weak_ptr<Resource> resource) : type_(type), resource_(std::move(resource)) {
|
||||
explicit Event(EventType type, std::shared_ptr<Resource> resource) : type_(type), resource_(std::move(resource)) {
|
||||
}
|
||||
|
||||
inline EventType
|
||||
|
@ -46,7 +46,7 @@ class Event {
|
|||
|
||||
public:
|
||||
EventType type_;
|
||||
std::weak_ptr<Resource> resource_;
|
||||
std::shared_ptr<Resource> resource_;
|
||||
};
|
||||
|
||||
using EventPtr = std::shared_ptr<Event>;
|
||||
|
|
|
@ -29,7 +29,7 @@ namespace scheduler {
|
|||
|
||||
class FinishTaskEvent : public Event {
|
||||
public:
|
||||
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
FinishTaskEvent(std::shared_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
: Event(EventType::FINISH_TASK, std::move(resource)), task_table_item_(std::move(task_table_item)) {
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ namespace scheduler {
|
|||
|
||||
class LoadCompletedEvent : public Event {
|
||||
public:
|
||||
LoadCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
LoadCompletedEvent(std::shared_ptr<Resource> resource, TaskTableItemPtr task_table_item)
|
||||
: Event(EventType::LOAD_COMPLETED, std::move(resource)), task_table_item_(std::move(task_table_item)) {
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace scheduler {
|
|||
|
||||
class StartUpEvent : public Event {
|
||||
public:
|
||||
explicit StartUpEvent(std::weak_ptr<Resource> resource) : Event(EventType::START_UP, std::move(resource)) {
|
||||
explicit StartUpEvent(std::shared_ptr<Resource> resource) : Event(EventType::START_UP, std::move(resource)) {
|
||||
}
|
||||
|
||||
inline std::string
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace scheduler {
|
|||
|
||||
class TaskTableUpdatedEvent : public Event {
|
||||
public:
|
||||
explicit TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
|
||||
explicit TaskTableUpdatedEvent(std::shared_ptr<Resource> resource)
|
||||
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {
|
||||
}
|
||||
|
||||
|
|
|
@ -23,8 +23,8 @@
|
|||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
|
||||
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
|
||||
BuildIndexJob::BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
|
||||
: Job(JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -59,6 +59,8 @@ BuildIndexJob::Dump() const {
|
|||
json ret{
|
||||
{"number_of_to_index_file", to_index_files_.size()},
|
||||
};
|
||||
auto base = Job::Dump();
|
||||
ret.insert(base.begin(), base.end());
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
|
|||
|
||||
class BuildIndexJob : public Job {
|
||||
public:
|
||||
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
|
||||
explicit BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
|
||||
|
||||
public:
|
||||
bool
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
DeleteJob::DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource)
|
||||
: Job(id, JobType::DELETE),
|
||||
DeleteJob::DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource)
|
||||
: Job(JobType::DELETE),
|
||||
table_id_(std::move(table_id)),
|
||||
meta_ptr_(std::move(meta_ptr)),
|
||||
num_resource_(num_resource) {
|
||||
|
@ -52,6 +52,8 @@ DeleteJob::Dump() const {
|
|||
{"number_of_resource", num_resource_},
|
||||
{"number_of_done", done_resource},
|
||||
};
|
||||
auto base = Job::Dump();
|
||||
ret.insert(base.begin(), base.end());
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ namespace scheduler {
|
|||
|
||||
class DeleteJob : public Job {
|
||||
public:
|
||||
DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource);
|
||||
DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource);
|
||||
|
||||
public:
|
||||
void
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "Job.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
namespace {
|
||||
std::mutex unique_job_mutex;
|
||||
uint64_t unique_job_id = 0;
|
||||
} // namespace
|
||||
|
||||
Job::Job(JobType type) : type_(type) {
|
||||
std::lock_guard<std::mutex> lock(unique_job_mutex);
|
||||
id_ = unique_job_id++;
|
||||
}
|
||||
|
||||
json
|
||||
Job::Dump() const {
|
||||
json ret{
|
||||
{"id", id_},
|
||||
{"type", type_},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
|
@ -53,12 +53,14 @@ class Job : public interface::dumpable {
|
|||
return type_;
|
||||
}
|
||||
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
protected:
|
||||
Job(JobId id, JobType type) : id_(id), type_(type) {
|
||||
}
|
||||
explicit Job(JobType type);
|
||||
|
||||
private:
|
||||
JobId id_;
|
||||
JobId id_ = 0;
|
||||
JobType type_;
|
||||
};
|
||||
|
||||
|
|
|
@ -21,8 +21,8 @@
|
|||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
SearchJob::SearchJob(milvus::scheduler::JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
|
||||
: Job(id, JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
|
||||
SearchJob::SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
|
||||
: Job(JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -70,6 +70,8 @@ SearchJob::Dump() const {
|
|||
{"nq", nq_},
|
||||
{"nprobe", nprobe_},
|
||||
};
|
||||
auto base = Job::Dump();
|
||||
ret.insert(base.begin(), base.end());
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ using ResultSet = std::vector<Id2DistVec>;
|
|||
|
||||
class SearchJob : public Job {
|
||||
public:
|
||||
SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
|
||||
SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
|
||||
|
||||
public:
|
||||
bool
|
||||
|
|
|
@ -58,9 +58,7 @@ Node::Dump() const {
|
|||
void
|
||||
Node::AddNeighbour(const NeighbourNodePtr& neighbour_node, Connection& connection) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
if (auto s = neighbour_node.lock()) {
|
||||
neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection)));
|
||||
}
|
||||
neighbours_.emplace(std::make_pair(neighbour_node->id_, Neighbour(neighbour_node, connection)));
|
||||
// else do nothing, consider it..
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "Connection.h"
|
||||
|
@ -31,10 +32,14 @@ namespace scheduler {
|
|||
|
||||
class Node;
|
||||
|
||||
using NeighbourNodePtr = std::weak_ptr<Node>;
|
||||
using NeighbourNodePtr = std::shared_ptr<Node>;
|
||||
|
||||
struct Neighbour {
|
||||
Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(nei), connection(conn) {
|
||||
Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(std::move(nei)), connection(std::move(conn)) {
|
||||
}
|
||||
|
||||
~Neighbour() {
|
||||
neighbour_node = nullptr;
|
||||
}
|
||||
|
||||
NeighbourNodePtr neighbour_node;
|
||||
|
|
|
@ -132,7 +132,7 @@ Resource::pick_task_load() {
|
|||
for (auto index : indexes) {
|
||||
// try to set one task loading, then return
|
||||
if (task_table_.Load(index))
|
||||
return task_table_.Get(index);
|
||||
return task_table_.at(index);
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
|
@ -150,7 +150,7 @@ Resource::pick_task_execute() {
|
|||
}
|
||||
|
||||
if (task_table_.Execute(index)) {
|
||||
return task_table_.Get(index);
|
||||
return task_table_.at(index);
|
||||
}
|
||||
// if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
// if (task_table_.Get(index)->task->path().Current() == task_table_.Get(index)->task->path().Last()
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace milvus {
|
|||
namespace scheduler {
|
||||
|
||||
TEST(EventTest, START_UP_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<StartUpEvent>(res);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
@ -36,7 +36,7 @@ TEST(EventTest, START_UP_EVENT) {
|
|||
}
|
||||
|
||||
TEST(EventTest, LOAD_COMPLETED_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<LoadCompletedEvent>(res, nullptr);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
@ -44,7 +44,7 @@ TEST(EventTest, LOAD_COMPLETED_EVENT) {
|
|||
}
|
||||
|
||||
TEST(EventTest, FINISH_TASK_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<FinishTaskEvent>(res, nullptr);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
@ -53,7 +53,7 @@ TEST(EventTest, FINISH_TASK_EVENT) {
|
|||
|
||||
|
||||
TEST(EventTest, TASKTABLE_UPDATED_EVENT) {
|
||||
ResourceWPtr res(ResourcePtr(nullptr));
|
||||
ResourcePtr res(nullptr);
|
||||
auto event = std::make_shared<TaskTableUpdatedEvent>(res);
|
||||
ASSERT_FALSE(event->Dump().empty());
|
||||
std::cout << *event;
|
||||
|
|
|
@ -15,15 +15,14 @@
|
|||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
#include "scheduler/resource/Node.h"
|
||||
#include <gtest/gtest.h>
|
||||
#include "scheduler/resource/Node.h"
|
||||
|
||||
namespace {
|
||||
|
||||
namespace ms = milvus::scheduler;
|
||||
|
||||
} // namespace
|
||||
} // namespace
|
||||
|
||||
class NodeTest : public ::testing::Test {
|
||||
protected:
|
||||
|
@ -73,9 +72,11 @@ TEST_F(NodeTest, GET_NEIGHBOURS) {
|
|||
bool n2 = false, n3 = false;
|
||||
auto node1_neighbours = node1_->GetNeighbours();
|
||||
ASSERT_EQ(node1_neighbours.size(), 2);
|
||||
for (auto &n : node1_neighbours) {
|
||||
if (n.neighbour_node.lock() == node2_) n2 = true;
|
||||
if (n.neighbour_node.lock() == node3_) n3 = true;
|
||||
for (auto& n : node1_neighbours) {
|
||||
if (n.neighbour_node == node2_)
|
||||
n2 = true;
|
||||
if (n.neighbour_node == node3_)
|
||||
n3 = true;
|
||||
}
|
||||
ASSERT_TRUE(n2);
|
||||
ASSERT_TRUE(n3);
|
||||
|
@ -84,7 +85,7 @@ TEST_F(NodeTest, GET_NEIGHBOURS) {
|
|||
{
|
||||
auto node2_neighbours = node2_->GetNeighbours();
|
||||
ASSERT_EQ(node2_neighbours.size(), 1);
|
||||
ASSERT_EQ(node2_neighbours[0].neighbour_node.lock(), node1_);
|
||||
ASSERT_EQ(node2_neighbours[0].neighbour_node, node1_);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -100,4 +101,3 @@ TEST_F(NodeTest, DUMP) {
|
|||
std::cout << node2_->Dump();
|
||||
ASSERT_FALSE(node2_->Dump().empty());
|
||||
}
|
||||
|
||||
|
|
|
@ -165,7 +165,7 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
|
|||
}
|
||||
|
||||
sleep(3);
|
||||
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM);
|
||||
}
|
||||
|
||||
TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
|
||||
|
|
|
@ -183,19 +183,19 @@ TEST_F(TaskTableBaseTest, SUBSCRIBER) {
|
|||
|
||||
TEST_F(TaskTableBaseTest, PUT_TASK) {
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
|
||||
empty_table_.Put(invalid_task_);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, invalid_task_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_BATCH) {
|
||||
std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
|
||||
empty_table_.Put(tasks);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.Get(1)->task, task2_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(1)->task, task2_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
|
||||
|
@ -204,14 +204,14 @@ TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
|
|||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, SIZE) {
|
||||
ASSERT_EQ(empty_table_.Size(), 0);
|
||||
ASSERT_EQ(empty_table_.size(), 0);
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Size(), 1);
|
||||
ASSERT_EQ(empty_table_.size(), 1);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, OPERATOR) {
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Get(0), empty_table_[0]);
|
||||
ASSERT_EQ(empty_table_.at(0), empty_table_[0]);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
|
||||
|
@ -224,7 +224,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
|
|||
|
||||
auto indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
|
||||
|
@ -237,9 +237,9 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
|
|||
|
||||
auto indexes = empty_table_.PickToLoad(3);
|
||||
ASSERT_EQ(indexes.size(), 3);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
|
||||
ASSERT_EQ(indexes[2] % empty_table_.Capacity(), 4);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
|
||||
ASSERT_EQ(indexes[2] % empty_table_.capacity(), 4);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
|
||||
|
@ -253,14 +253,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
|
|||
// first pick, non-cache
|
||||
auto indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
|
||||
// second pick, iterate from 2
|
||||
// invalid state change
|
||||
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
|
||||
indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
|
||||
|
@ -274,7 +274,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
|
|||
|
||||
auto indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
|
||||
|
@ -289,8 +289,8 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
|
|||
|
||||
auto indexes = empty_table_.PickToExecute(3);
|
||||
ASSERT_EQ(indexes.size(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
|
||||
|
@ -305,14 +305,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
|
|||
// first pick, non-cache
|
||||
auto indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
|
||||
// second pick, iterate from 2
|
||||
// invalid state change
|
||||
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
|
||||
indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
/************ TaskTableAdvanceTest ************/
|
||||
|
@ -328,14 +328,14 @@ class TaskTableAdvanceTest : public ::testing::Test {
|
|||
table1_.Put(task);
|
||||
}
|
||||
|
||||
table1_.Get(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
|
||||
table1_.Get(1)->state = milvus::scheduler::TaskTableItemState::START;
|
||||
table1_.Get(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
|
||||
table1_.Get(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
|
||||
table1_.Get(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
|
||||
table1_.Get(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
|
||||
table1_.Get(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
|
||||
table1_.Get(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
|
||||
table1_.at(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
|
||||
table1_.at(1)->state = milvus::scheduler::TaskTableItemState::START;
|
||||
table1_.at(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
|
||||
table1_.at(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
|
||||
table1_.at(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
|
||||
table1_.at(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
|
||||
table1_.at(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
|
||||
table1_.at(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
|
||||
}
|
||||
|
||||
milvus::scheduler::TaskTable table1_;
|
||||
|
@ -343,114 +343,114 @@ class TaskTableAdvanceTest : public ::testing::Test {
|
|||
|
||||
TEST_F(TaskTableAdvanceTest, LOAD) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Load(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::START) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, LOADED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Loaded(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADING) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, EXECUTE) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Execute(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, EXECUTED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Executed(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::EXECUTING) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, MOVE) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Move(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, MOVED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Moved(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::MOVING) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue