Add unique id for Job

Former-commit-id: 1865dbd859f345a3febc3ad76682f928678e59f5
pull/191/head
wxyu 2019-10-28 20:34:26 +08:00
parent 7510f1f7a2
commit de2bb68daa
11 changed files with 49 additions and 19 deletions

View File

@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#92 - Speed up CMake build process
- \#96 - Remove .a file in milvus/lib for docker-version
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
- \#122 - Add unique id for Job
## Feature
- \#115 - Using new structure for tasktable

View File

@ -136,7 +136,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
// scheduler will determine when to delete table files
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitAndDelete();
} else {
@ -439,7 +439,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
// step 1: get files to search
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
for (auto& file : files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddIndexFile(file_ptr);
@ -754,7 +754,7 @@ DBImpl::BackgroundBuildIndex() {
Status status;
if (!to_index_files.empty()) {
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
// step 2: put build index task to scheduler
for (auto& file : to_index_files) {

View File

@ -75,7 +75,6 @@ class ResourceMgr : public interface::dumpable {
return gpu_resources_;
}
// TODO(wxyu): why return shared pointer
inline std::vector<ResourcePtr>
GetAllResources() {
return resources_;

View File

@ -23,8 +23,8 @@
namespace milvus {
namespace scheduler {
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
BuildIndexJob::BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
}
bool
@ -59,6 +59,8 @@ BuildIndexJob::Dump() const {
json ret{
{"number_of_to_index_file", to_index_files_.size()},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}

View File

@ -41,7 +41,7 @@ using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
explicit BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
public:
bool

View File

@ -22,8 +22,8 @@
namespace milvus {
namespace scheduler {
DeleteJob::DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource)
: Job(id, JobType::DELETE),
DeleteJob::DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource)
: Job(JobType::DELETE),
table_id_(std::move(table_id)),
meta_ptr_(std::move(meta_ptr)),
num_resource_(num_resource) {
@ -52,6 +52,8 @@ DeleteJob::Dump() const {
{"number_of_resource", num_resource_},
{"number_of_done", done_resource},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}

View File

@ -35,7 +35,7 @@ namespace scheduler {
class DeleteJob : public Job {
public:
DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource);
DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource);
public:
void

View File

@ -15,7 +15,29 @@
// specific language governing permissions and limitations
// under the License.
//
// Created by wxyu on 2019/10/28.
//
#include "Job.h"
namespace milvus {
namespace scheduler {
namespace {
std::mutex unique_job_mutex;
uint64_t unique_job_id = 0;
} // namespace
Job::Job(JobType type) : type_(type) {
std::lock_guard<std::mutex> lock(unique_job_mutex);
id_ = unique_job_id++;
}
json
Job::Dump() const {
json ret{
{"id", id_},
{"type", type_},
};
return ret;
}
} // namespace scheduler
} // namespace milvus

View File

@ -53,12 +53,14 @@ class Job : public interface::dumpable {
return type_;
}
json
Dump() const override;
protected:
Job(JobId id, JobType type) : id_(id), type_(type) {
}
explicit Job(JobType type);
private:
JobId id_;
JobId id_ = 0;
JobType type_;
};

View File

@ -21,8 +21,8 @@
namespace milvus {
namespace scheduler {
SearchJob::SearchJob(milvus::scheduler::JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
: Job(id, JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
SearchJob::SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
: Job(JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
}
bool
@ -70,6 +70,8 @@ SearchJob::Dump() const {
{"nq", nq_},
{"nprobe", nprobe_},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}

View File

@ -43,7 +43,7 @@ using ResultSet = std::vector<Id2DistVec>;
class SearchJob : public Job {
public:
SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
public:
bool