mirror of https://github.com/milvus-io/milvus.git
Merge pull request #91 from scsven/dev
Improvement dump function in scheduler Former-commit-id: 2b7853f866db6aa3c7000c3629dfc28327a94762pull/191/head
commit
8b64aa7cda
|
@ -49,6 +49,15 @@ JobMgr::Stop() {
|
|||
}
|
||||
}
|
||||
|
||||
json
|
||||
JobMgr::Dump() const {
|
||||
json ret{
|
||||
{"running", running_},
|
||||
{"event_queue_length", queue_.size()},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
JobMgr::Put(const JobPtr& job) {
|
||||
{
|
||||
|
|
|
@ -28,13 +28,14 @@
|
|||
#include <vector>
|
||||
|
||||
#include "ResourceMgr.h"
|
||||
#include "interface/interfaces.h"
|
||||
#include "job/Job.h"
|
||||
#include "task/Task.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
class JobMgr {
|
||||
class JobMgr : public interface::dumpable {
|
||||
public:
|
||||
explicit JobMgr(ResourceMgrPtr res_mgr);
|
||||
|
||||
|
@ -44,6 +45,9 @@ class JobMgr {
|
|||
void
|
||||
Stop();
|
||||
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
void
|
||||
Put(const JobPtr& job);
|
||||
|
|
|
@ -170,16 +170,20 @@ ResourceMgr::GetNumGpuResource() const {
|
|||
return num;
|
||||
}
|
||||
|
||||
std::string
|
||||
ResourceMgr::Dump() {
|
||||
std::stringstream ss;
|
||||
ss << "ResourceMgr contains " << resources_.size() << " resources." << std::endl;
|
||||
|
||||
json
|
||||
ResourceMgr::Dump() const {
|
||||
json resources{};
|
||||
for (auto& res : resources_) {
|
||||
ss << res->Dump();
|
||||
resources.push_back(res->Dump());
|
||||
}
|
||||
|
||||
return ss.str();
|
||||
json ret{
|
||||
{"number_of_resource", resources_.size()},
|
||||
{"number_of_disk_resource", disk_resources_.size()},
|
||||
{"number_of_cpu_resource", cpu_resources_.size()},
|
||||
{"number_of_gpu_resource", gpu_resources_.size()},
|
||||
{"resources", resources},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string
|
||||
|
@ -187,9 +191,9 @@ ResourceMgr::DumpTaskTables() {
|
|||
std::stringstream ss;
|
||||
ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
|
||||
for (auto& resource : resources_) {
|
||||
ss << resource->Dump() << std::endl;
|
||||
ss << resource->task_table().Dump();
|
||||
ss << resource->Dump() << std::endl << std::endl;
|
||||
ss << resource->name() << std::endl;
|
||||
ss << resource->task_table().Dump().dump();
|
||||
ss << resource->name() << std::endl << std::endl;
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
|
|
@ -25,13 +25,14 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "interface/interfaces.h"
|
||||
#include "resource/Resource.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
class ResourceMgr {
|
||||
class ResourceMgr : public interface::dumpable {
|
||||
public:
|
||||
ResourceMgr() = default;
|
||||
|
||||
|
@ -103,8 +104,8 @@ class ResourceMgr {
|
|||
|
||||
public:
|
||||
/******** Utility Functions ********/
|
||||
std::string
|
||||
Dump();
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
std::string
|
||||
DumpTaskTables();
|
||||
|
|
|
@ -66,9 +66,13 @@ Scheduler::PostEvent(const EventPtr& event) {
|
|||
event_cv_.notify_one();
|
||||
}
|
||||
|
||||
std::string
|
||||
Scheduler::Dump() {
|
||||
return std::string();
|
||||
json
|
||||
Scheduler::Dump() const {
|
||||
json ret{
|
||||
{"running", running_},
|
||||
{"event_queue_length", event_queue_.size()},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -25,14 +25,14 @@
|
|||
#include <unordered_map>
|
||||
|
||||
#include "ResourceMgr.h"
|
||||
#include "interface/interfaces.h"
|
||||
#include "resource/Resource.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
// TODO(wxyu): refactor, not friendly to unittest, logical in framework code
|
||||
class Scheduler {
|
||||
class Scheduler : public interface::dumpable {
|
||||
public:
|
||||
explicit Scheduler(ResourceMgrWPtr res_mgr);
|
||||
|
||||
|
@ -57,11 +57,8 @@ class Scheduler {
|
|||
void
|
||||
PostEvent(const EventPtr& event);
|
||||
|
||||
/*
|
||||
* Dump as string;
|
||||
*/
|
||||
std::string
|
||||
Dump();
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
private:
|
||||
/******** Events ********/
|
||||
|
|
|
@ -53,7 +53,7 @@ ToString(TaskTableItemState state) {
|
|||
}
|
||||
|
||||
json
|
||||
TaskTimestamp::Dump() {
|
||||
TaskTimestamp::Dump() const {
|
||||
json ret{
|
||||
{"start", start}, {"load", load}, {"loaded", loaded}, {"execute", execute},
|
||||
{"executed", executed}, {"move", move}, {"moved", moved}, {"finish", finish},
|
||||
|
@ -141,7 +141,7 @@ TaskTableItem::Moved() {
|
|||
}
|
||||
|
||||
json
|
||||
TaskTableItem::Dump() {
|
||||
TaskTableItem::Dump() const {
|
||||
json ret{
|
||||
{"id", id},
|
||||
{"task", (int64_t)task.get()},
|
||||
|
@ -263,7 +263,7 @@ TaskTable::Get(uint64_t index) {
|
|||
//}
|
||||
|
||||
json
|
||||
TaskTable::Dump() {
|
||||
TaskTable::Dump() const {
|
||||
json ret;
|
||||
for (auto& item : table_) {
|
||||
ret.push_back(item->Dump());
|
||||
|
|
|
@ -54,7 +54,7 @@ struct TaskTimestamp : public interface::dumpable {
|
|||
uint64_t finish = 0;
|
||||
|
||||
json
|
||||
Dump() override;
|
||||
Dump() const override;
|
||||
};
|
||||
|
||||
struct TaskTableItem : public interface::dumpable {
|
||||
|
@ -92,7 +92,7 @@ struct TaskTableItem : public interface::dumpable {
|
|||
Moved();
|
||||
|
||||
json
|
||||
Dump() override;
|
||||
Dump() const override;
|
||||
};
|
||||
|
||||
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
|
||||
|
@ -245,7 +245,7 @@ class TaskTable : public interface::dumpable {
|
|||
* Dump;
|
||||
*/
|
||||
json
|
||||
Dump() override;
|
||||
Dump() const override;
|
||||
|
||||
private:
|
||||
std::uint64_t id_ = 0;
|
||||
|
|
|
@ -37,7 +37,7 @@ struct dumpable {
|
|||
}
|
||||
|
||||
virtual json
|
||||
Dump() = 0;
|
||||
Dump() const = 0;
|
||||
};
|
||||
|
||||
} // namespace interface
|
||||
|
|
|
@ -54,5 +54,13 @@ BuildIndexJob::BuildIndexDone(size_t to_index_id) {
|
|||
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id;
|
||||
}
|
||||
|
||||
json
|
||||
BuildIndexJob::Dump() const {
|
||||
json ret{
|
||||
{"number_of_to_index_file", to_index_files_.size()},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
|
|
@ -53,6 +53,9 @@ class BuildIndexJob : public Job {
|
|||
void
|
||||
BuildIndexDone(size_t to_index_id);
|
||||
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
Status&
|
||||
GetStatus() {
|
||||
|
|
|
@ -45,5 +45,15 @@ DeleteJob::ResourceDone() {
|
|||
cv_.notify_one();
|
||||
}
|
||||
|
||||
json
|
||||
DeleteJob::Dump() const {
|
||||
json ret{
|
||||
{"table_id", table_id_},
|
||||
{"number_of_resource", num_resource_},
|
||||
{"number_of_done", done_resource},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
|
|
@ -44,6 +44,9 @@ class DeleteJob : public Job {
|
|||
void
|
||||
ResourceDone();
|
||||
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
std::string
|
||||
table_id() const {
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "scheduler/interface/interfaces.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
|
@ -39,7 +41,7 @@ enum class JobType {
|
|||
|
||||
using JobId = std::uint64_t;
|
||||
|
||||
class Job {
|
||||
class Job : public interface::dumpable {
|
||||
public:
|
||||
inline JobId
|
||||
id() const {
|
||||
|
|
|
@ -63,5 +63,15 @@ SearchJob::GetStatus() {
|
|||
return status_;
|
||||
}
|
||||
|
||||
json
|
||||
SearchJob::Dump() const {
|
||||
json ret{
|
||||
{"topk", topk_},
|
||||
{"nq", nq_},
|
||||
{"nprobe", nprobe_},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
|
|
@ -61,6 +61,9 @@ class SearchJob : public Job {
|
|||
Status&
|
||||
GetStatus();
|
||||
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
uint64_t
|
||||
topk() const {
|
||||
|
|
|
@ -24,7 +24,7 @@ namespace scheduler {
|
|||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& out, const CpuResource& resource) {
|
||||
out << resource.Dump();
|
||||
out << resource.Dump().dump();
|
||||
return out;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,11 +28,6 @@ class CpuResource : public Resource {
|
|||
public:
|
||||
explicit CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<CpuResource, name=" + name_ + ">";
|
||||
}
|
||||
|
||||
friend std::ostream&
|
||||
operator<<(std::ostream& out, const CpuResource& resource);
|
||||
|
||||
|
|
|
@ -28,11 +28,6 @@ class DiskResource : public Resource {
|
|||
public:
|
||||
explicit DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<DiskResource, name=" + name_ + ">";
|
||||
}
|
||||
|
||||
friend std::ostream&
|
||||
operator<<(std::ostream& out, const DiskResource& resource);
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ namespace scheduler {
|
|||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& out, const GpuResource& resource) {
|
||||
out << resource.Dump();
|
||||
out << resource.Dump().dump();
|
||||
return out;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,11 +29,6 @@ class GpuResource : public Resource {
|
|||
public:
|
||||
explicit GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<GpuResource, name=" + name_ + ">";
|
||||
}
|
||||
|
||||
friend std::ostream&
|
||||
operator<<(std::ostream& out, const GpuResource& resource);
|
||||
|
||||
|
|
|
@ -38,15 +38,21 @@ Node::GetNeighbours() {
|
|||
return ret;
|
||||
}
|
||||
|
||||
std::string
|
||||
Node::Dump() {
|
||||
std::stringstream ss;
|
||||
ss << "<Node, id=" << std::to_string(id_) << ">::neighbours:" << std::endl;
|
||||
json
|
||||
Node::Dump() const {
|
||||
json neighbours;
|
||||
for (auto& neighbour : neighbours_) {
|
||||
ss << "\t<Neighbour, id=" << std::to_string(neighbour.first);
|
||||
ss << ", connection: " << neighbour.second.connection.Dump() << ">" << std::endl;
|
||||
json n;
|
||||
n["id"] = neighbour.first;
|
||||
n["connection"] = neighbour.second.connection.Dump();
|
||||
neighbours.push_back(n);
|
||||
}
|
||||
return ss.str();
|
||||
|
||||
json ret{
|
||||
{"id", id_},
|
||||
{"neighbours", neighbours},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include "Connection.h"
|
||||
#include "scheduler/TaskTable.h"
|
||||
#include "scheduler/interface/interfaces.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
@ -41,7 +42,7 @@ struct Neighbour {
|
|||
};
|
||||
|
||||
// TODO(lxj): return type void -> Status
|
||||
class Node {
|
||||
class Node : public interface::dumpable {
|
||||
public:
|
||||
Node();
|
||||
|
||||
|
@ -52,8 +53,8 @@ class Node {
|
|||
GetNeighbours();
|
||||
|
||||
public:
|
||||
std::string
|
||||
Dump();
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
private:
|
||||
std::mutex mutex_;
|
||||
|
|
|
@ -32,6 +32,22 @@ operator<<(std::ostream& out, const Resource& resource) {
|
|||
return out;
|
||||
}
|
||||
|
||||
std::string
|
||||
ToString(ResourceType type) {
|
||||
switch (type) {
|
||||
case ResourceType::DISK: {
|
||||
return "DISK";
|
||||
}
|
||||
case ResourceType::CPU: {
|
||||
return "CPU";
|
||||
}
|
||||
case ResourceType::GPU: {
|
||||
return "GPU";
|
||||
}
|
||||
default: { return "UNKNOWN"; }
|
||||
}
|
||||
}
|
||||
|
||||
Resource::Resource(std::string name, ResourceType type, uint64_t device_id, bool enable_loader, bool enable_executor)
|
||||
: name_(std::move(name)),
|
||||
type_(type),
|
||||
|
@ -89,6 +105,22 @@ Resource::WakeupExecutor() {
|
|||
exec_cv_.notify_one();
|
||||
}
|
||||
|
||||
json
|
||||
Resource::Dump() const {
|
||||
json ret{
|
||||
{"device_id", device_id_},
|
||||
{"name", name_},
|
||||
{"type", ToString(type_)},
|
||||
{"task_average_cost", TaskAvgCost()},
|
||||
{"task_total_cost", total_cost_},
|
||||
{"total_tasks", total_task_},
|
||||
{"running", running_},
|
||||
{"enable_loader", enable_loader_},
|
||||
{"enable_executor", enable_executor_},
|
||||
};
|
||||
return ret;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
Resource::NumOfTaskToExec() {
|
||||
uint64_t count = 0;
|
||||
|
|
|
@ -77,10 +77,8 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
|
|||
subscriber_ = std::move(subscriber);
|
||||
}
|
||||
|
||||
inline virtual std::string
|
||||
Dump() const {
|
||||
return "<Resource>";
|
||||
}
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
inline std::string
|
||||
|
|
|
@ -29,11 +29,6 @@ class TestResource : public Resource {
|
|||
public:
|
||||
explicit TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<TestResource, name=" + name_ + ">";
|
||||
}
|
||||
|
||||
friend std::ostream&
|
||||
operator<<(std::ostream& out, const TestResource& resource);
|
||||
|
||||
|
|
Loading…
Reference in New Issue