mirror of https://github.com/milvus-io/milvus.git
Merge branch 'dev_resource' into 'branch-0.4.0'
MS-337 dev basic Resource See merge request megasearch/milvus!365 Former-commit-id: adbf8a4fef8994475a0f8153fa05d30df802fc94pull/191/head
commit
3537a35ff0
|
@ -0,0 +1,38 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "CpuResource.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
|
||||
CpuResource::CpuResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::CPU) {}
|
||||
|
||||
void CpuResource::LoadFile(TaskPtr task) {
|
||||
//if (src.type == DISK) {
|
||||
// fd = open(filename);
|
||||
// content = fd.read();
|
||||
// close(fd);
|
||||
//} else if (src.type == CPU) {
|
||||
// memcpy(src, dest, len);
|
||||
//} else if (src.type == GPU) {
|
||||
// cudaMemcpyD2H(src, dest);
|
||||
//} else {
|
||||
// // unknown type, exception
|
||||
//}
|
||||
}
|
||||
|
||||
void CpuResource::Process(TaskPtr task) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,29 +17,14 @@ namespace engine {
|
|||
class CpuResource : public Resource {
|
||||
public:
|
||||
explicit
|
||||
CpuResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::CPU) {}
|
||||
CpuResource(std::string name);
|
||||
|
||||
protected:
|
||||
void
|
||||
LoadFile(TaskPtr task) override {
|
||||
// if (src.type == DISK) {
|
||||
// fd = open(filename);
|
||||
// content = fd.read();
|
||||
// close(fd);
|
||||
// } else if (src.type == CPU) {
|
||||
// memcpy(src, dest, len);
|
||||
// } else if (src.type == GPU) {
|
||||
// cudaMemcpyD2H(src, dest);
|
||||
// } else {
|
||||
// // unknown type, exception
|
||||
// }
|
||||
}
|
||||
LoadFile(TaskPtr task) override;
|
||||
|
||||
void
|
||||
Process(TaskPtr task) override {
|
||||
task->Execute();
|
||||
}
|
||||
Process(TaskPtr task) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include "DiskResource.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
|
||||
DiskResource::DiskResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::DISK) {}
|
||||
|
||||
void DiskResource::LoadFile(TaskPtr task) {
|
||||
|
||||
}
|
||||
|
||||
void DiskResource::Process(TaskPtr task) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,15 +16,14 @@ namespace engine {
|
|||
class DiskResource : public Resource {
|
||||
public:
|
||||
explicit
|
||||
DiskResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::DISK) {}
|
||||
DiskResource(std::string name);
|
||||
|
||||
protected:
|
||||
void
|
||||
LoadFile(TaskPtr task) override {}
|
||||
LoadFile(TaskPtr task) override;
|
||||
|
||||
void
|
||||
Process(TaskPtr task) override {}
|
||||
Process(TaskPtr task) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "GpuResource.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
|
||||
GpuResource::GpuResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::GPU) {}
|
||||
|
||||
void GpuResource::LoadFile(TaskPtr task) {
|
||||
|
||||
}
|
||||
|
||||
void GpuResource::Process(TaskPtr task) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,15 +16,14 @@ namespace engine {
|
|||
class GpuResource : public Resource {
|
||||
public:
|
||||
explicit
|
||||
GpuResource(std::string name)
|
||||
: Resource(std::move(name), ResourceType::GPU) {}
|
||||
GpuResource(std::string name);
|
||||
|
||||
protected:
|
||||
void
|
||||
LoadFile(TaskPtr task) override {}
|
||||
LoadFile(TaskPtr task) override;
|
||||
|
||||
void
|
||||
Process(TaskPtr task) override {}
|
||||
Process(TaskPtr task) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include <atomic>
|
||||
#include "Node.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
Node::Node() {
|
||||
static std::atomic_uint_fast8_t counter(0);
|
||||
id_ = counter++;
|
||||
}
|
||||
|
||||
void Node::DelNeighbour(const NeighbourNodePtr &neighbour_ptr) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
if (auto s = neighbour_ptr.lock()) {
|
||||
auto search = neighbours_.find(s->id_);
|
||||
if (search != neighbours_.end()) {
|
||||
neighbours_.erase(search);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool Node::IsNeighbour(const NeighbourNodePtr &neighbour_ptr) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
if (auto s = neighbour_ptr.lock()) {
|
||||
auto search = neighbours_.find(s->id_);
|
||||
if (search != neighbours_.end()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<Neighbour> Node::GetNeighbours() {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
std::vector<Neighbour> ret;
|
||||
for (auto &e : neighbours_) {
|
||||
ret.push_back(e.second);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string Node::Dump() {
|
||||
// TODO(linxj): what's that?
|
||||
return std::__cxx11::string();
|
||||
}
|
||||
|
||||
void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
if (auto s = neighbour_node.lock()) {
|
||||
Neighbour neighbour(neighbour_node, connection);
|
||||
neighbours_[s->id_] = neighbour;
|
||||
}
|
||||
// else do nothing, consider it..
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <map>
|
||||
|
||||
#include "../TaskTable.h"
|
||||
#include "Connection.h"
|
||||
|
@ -28,29 +29,31 @@ struct Neighbour {
|
|||
Connection connection;
|
||||
};
|
||||
|
||||
// TODO(linxj): return type void -> Status
|
||||
class Node {
|
||||
public:
|
||||
void
|
||||
AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
|
||||
Neighbour neighbour(neighbour_node, connection);
|
||||
neighbours_.emplace_back(neighbour);
|
||||
}
|
||||
Node();
|
||||
|
||||
void
|
||||
DelNeighbour(NeighbourNodePtr neighbour_ptr) {}
|
||||
AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection);
|
||||
|
||||
void
|
||||
DelNeighbour(const NeighbourNodePtr &neighbour_ptr);
|
||||
|
||||
bool
|
||||
IsNeighbour(NeighbourNodePtr neighbour_ptr) {}
|
||||
IsNeighbour(const NeighbourNodePtr& neighbour_ptr);
|
||||
|
||||
const std::vector<Neighbour> &
|
||||
GetNeighbours() {}
|
||||
std::vector<Neighbour>
|
||||
GetNeighbours();
|
||||
|
||||
public:
|
||||
std::string
|
||||
Dump();
|
||||
|
||||
private:
|
||||
std::vector<Neighbour> neighbours_;
|
||||
std::mutex mutex_;
|
||||
uint8_t id_;
|
||||
std::map<uint8_t, Neighbour> neighbours_;
|
||||
};
|
||||
|
||||
using NodePtr = std::shared_ptr<Node>;
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class RegisterHandler {
|
||||
public:
|
||||
virtual void Exec() = 0;
|
||||
};
|
||||
|
||||
using RegisterHandlerPtr = std::shared_ptr<RegisterHandler>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include "Resource.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
Resource::Resource(std::string name, ResourceType type)
|
||||
: name_(std::move(name)),
|
||||
type_(type),
|
||||
running_(false),
|
||||
load_flag_(false),
|
||||
exec_flag_(false) {
|
||||
}
|
||||
|
||||
void Resource::Start() {
|
||||
loader_thread_ = std::thread(&Resource::loader_function, this);
|
||||
executor_thread_ = std::thread(&Resource::executor_function, this);
|
||||
}
|
||||
|
||||
void Resource::Stop() {
|
||||
running_ = false;
|
||||
WakeupLoader();
|
||||
WakeupExecutor();
|
||||
}
|
||||
|
||||
TaskTable &Resource::task_table() {
|
||||
return task_table_;
|
||||
}
|
||||
|
||||
void Resource::WakeupExecutor() {
|
||||
exec_cv_.notify_one();
|
||||
}
|
||||
|
||||
void Resource::WakeupLoader() {
|
||||
load_cv_.notify_one();
|
||||
}
|
||||
|
||||
TaskPtr Resource::pick_task_load() {
|
||||
auto indexes = PickToLoad(task_table_, 3);
|
||||
for (auto index : indexes) {
|
||||
// try to set one task loading, then return
|
||||
if (task_table_.Load(index))
|
||||
return task_table_.Get(index).task;
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
TaskPtr Resource::pick_task_execute() {
|
||||
auto indexes = PickToExecute(task_table_, 3);
|
||||
for (auto index : indexes) {
|
||||
// try to set one task executing, then return
|
||||
if (task_table_.Execute(index))
|
||||
return task_table_.Get(index).task;
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void Resource::loader_function() {
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(load_mutex_);
|
||||
load_cv_.wait(lock, [&] { return load_flag_; });
|
||||
auto task = pick_task_load();
|
||||
if (task) {
|
||||
LoadFile(task);
|
||||
GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Resource::executor_function() {
|
||||
GetRegisterFunc(RegisterType::START_UP)->Exec();
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(exec_mutex_);
|
||||
exec_cv_.wait(lock, [&] { return exec_flag_; });
|
||||
auto task = pick_task_execute();
|
||||
if (task) {
|
||||
Process(task);
|
||||
GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) {
|
||||
// construct object each time.
|
||||
return register_table_[type]();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,8 +15,9 @@
|
|||
#include "../TaskTable.h"
|
||||
#include "../task/Task.h"
|
||||
#include "../Cost.h"
|
||||
#include "Node.h"
|
||||
#include "Connection.h"
|
||||
#include "Node.h"
|
||||
#include "RegisterHandler.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
|
@ -29,92 +30,50 @@ enum class ResourceType {
|
|||
GPU = 2
|
||||
};
|
||||
|
||||
enum class RegisterType {
|
||||
START_UP,
|
||||
ON_FINISH_TASK,
|
||||
ON_COPY_COMPLETED,
|
||||
ON_TASK_TABLE_UPDATED,
|
||||
};
|
||||
|
||||
class Resource : public Node {
|
||||
public:
|
||||
void
|
||||
Start() {
|
||||
loader_thread_ = std::thread(&Resource::loader_function, this);
|
||||
executor_thread_ = std::thread(&Resource::executor_function, this);
|
||||
/*
|
||||
* Event function MUST be a short function, never blocking;
|
||||
*/
|
||||
template <typename T>
|
||||
void Register_T(const RegisterType& type) {
|
||||
register_table_.emplace(type, [] { return std::make_shared<T>(); });
|
||||
}
|
||||
|
||||
RegisterHandlerPtr
|
||||
GetRegisterFunc(const RegisterType& type);
|
||||
|
||||
void
|
||||
Stop() {
|
||||
running_ = false;
|
||||
WakeupLoader();
|
||||
WakeupExecutor();
|
||||
}
|
||||
Start();
|
||||
|
||||
void
|
||||
Stop();
|
||||
|
||||
TaskTable &
|
||||
task_table() {
|
||||
return task_table_;
|
||||
}
|
||||
task_table();
|
||||
|
||||
public:
|
||||
/*
|
||||
* wake up executor;
|
||||
*/
|
||||
void
|
||||
WakeupExecutor() {
|
||||
exec_cv_.notify_one();
|
||||
}
|
||||
WakeupExecutor();
|
||||
|
||||
/*
|
||||
* wake up loader;
|
||||
*/
|
||||
void
|
||||
WakeupLoader() {
|
||||
load_cv_.notify_one();
|
||||
}
|
||||
|
||||
public:
|
||||
/*
|
||||
* Event function MUST be a short function, never blocking;
|
||||
*/
|
||||
|
||||
/*
|
||||
* Register on start up event;
|
||||
*/
|
||||
void
|
||||
RegisterOnStartUp(std::function<void(void)> func) {
|
||||
on_start_up_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on finish one task event;
|
||||
*/
|
||||
void
|
||||
RegisterOnFinishTask(std::function<void(void)> func) {
|
||||
on_finish_task_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on copy task data completed event;
|
||||
*/
|
||||
void
|
||||
RegisterOnCopyCompleted(std::function<void(void)> func) {
|
||||
on_copy_completed_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on task table updated event;
|
||||
*/
|
||||
void
|
||||
RegisterOnTaskTableUpdated(std::function<void(void)> func) {
|
||||
on_task_table_updated_ = func;
|
||||
}
|
||||
WakeupLoader();
|
||||
|
||||
protected:
|
||||
Resource(std::string name, ResourceType type)
|
||||
: name_(std::move(name)),
|
||||
type_(type),
|
||||
on_start_up_(nullptr),
|
||||
on_finish_task_(nullptr),
|
||||
on_copy_completed_(nullptr),
|
||||
on_task_table_updated_(nullptr),
|
||||
running_(false),
|
||||
load_flag_(false),
|
||||
exec_flag_(false) {
|
||||
}
|
||||
Resource(std::string name, ResourceType type);
|
||||
|
||||
// TODO: SearchContextPtr to TaskPtr
|
||||
/*
|
||||
|
@ -142,67 +101,27 @@ private:
|
|||
* Order by start time;
|
||||
*/
|
||||
TaskPtr
|
||||
pick_task_load() {
|
||||
auto indexes = PickToLoad(task_table_, 3);
|
||||
for (auto index : indexes) {
|
||||
// try to set one task loading, then return
|
||||
if (task_table_.Load(index))
|
||||
return task_table_.Get(index).task;
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
pick_task_load();
|
||||
|
||||
/*
|
||||
* Pick one task to execute;
|
||||
* Pick by start time and priority;
|
||||
*/
|
||||
TaskPtr
|
||||
pick_task_execute() {
|
||||
auto indexes = PickToExecute(task_table_, 3);
|
||||
for (auto index : indexes) {
|
||||
// try to set one task executing, then return
|
||||
if (task_table_.Execute(index))
|
||||
return task_table_.Get(index).task;
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
pick_task_execute();
|
||||
|
||||
private:
|
||||
/*
|
||||
* Only called by load thread;
|
||||
*/
|
||||
void
|
||||
loader_function() {
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(load_mutex_);
|
||||
load_cv_.wait(lock, [&] { return load_flag_; });
|
||||
auto task = pick_task_load();
|
||||
if (task) {
|
||||
LoadFile(task);
|
||||
on_copy_completed_();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
loader_function();
|
||||
|
||||
/*
|
||||
* Only called by worker thread;
|
||||
*/
|
||||
void
|
||||
executor_function() {
|
||||
on_start_up_();
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(exec_mutex_);
|
||||
exec_cv_.wait(lock, [&] { return exec_flag_; });
|
||||
auto task = pick_task_execute();
|
||||
if (task) {
|
||||
Process(task);
|
||||
on_finish_task_();
|
||||
}
|
||||
}
|
||||
}
|
||||
executor_function();
|
||||
|
||||
|
||||
private:
|
||||
|
@ -211,10 +130,7 @@ private:
|
|||
|
||||
TaskTable task_table_;
|
||||
|
||||
std::function<void(void)> on_start_up_;
|
||||
std::function<void(void)> on_finish_task_;
|
||||
std::function<void(void)> on_copy_completed_;
|
||||
std::function<void(void)> on_task_table_updated_;
|
||||
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
|
||||
|
||||
bool running_;
|
||||
std::thread loader_thread_;
|
||||
|
|
Loading…
Reference in New Issue