mirror of https://github.com/milvus-io/milvus.git
buildindex to scheduler run ok
Former-commit-id: 5d596c2cdc543d1093992d8a09594a1ca7d5d17apull/191/head
parent
a211263a2b
commit
bc10937bcf
|
@ -242,11 +242,7 @@ DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) {
|
|||
}
|
||||
|
||||
Status
|
||||
<<<<<<< HEAD
|
||||
DBImpl::InsertVectors(const std::string &table_id_, uint64_t n, const float *vectors, IDNumbers &vector_ids_) {
|
||||
=======
|
||||
DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vectors, IDNumbers& vector_ids) {
|
||||
>>>>>>> upstream/branch-0.5.0
|
||||
// ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
|
@ -299,17 +295,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
|
|||
// for IDMAP type, only wait all NEW file converted to RAW file
|
||||
// for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
|
||||
std::vector<int> file_types;
|
||||
<<<<<<< HEAD
|
||||
if (index.engine_type_ == (int) EngineType::FAISS_IDMAP) {
|
||||
file_types = {
|
||||
(int) meta::TableFileSchema::NEW, (int) meta::TableFileSchema::NEW_MERGE,
|
||||
};
|
||||
} else {
|
||||
file_types = {
|
||||
(int) meta::TableFileSchema::RAW, (int) meta::TableFileSchema::NEW,
|
||||
(int) meta::TableFileSchema::NEW_MERGE, (int) meta::TableFileSchema::NEW_INDEX,
|
||||
(int) meta::TableFileSchema::TO_INDEX,
|
||||
=======
|
||||
if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
|
||||
file_types = {
|
||||
static_cast<int32_t>(meta::TableFileSchema::NEW), static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
|
||||
|
@ -321,7 +306,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
|
|||
static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
|
||||
static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
|
||||
static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
|
||||
>>>>>>> upstream/branch-0.5.0
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -915,38 +899,36 @@ DBImpl::BackgroundBuildIndex() {
|
|||
Status status;
|
||||
|
||||
scheduler::BuildIndexJobPtr
|
||||
job = std::make_shared<scheduler::BuildIndexJob>(0);
|
||||
job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_);
|
||||
|
||||
// step 2: put build index task to scheduler
|
||||
scheduler::JobMgrInst::GetInstance()->Put(job);
|
||||
for (auto &file : to_index_files) {
|
||||
std::cout << "get to index file" << std::endl;
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file.table_id_;
|
||||
table_file.date_ = file.date_;
|
||||
table_file.file_type_ =
|
||||
meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path
|
||||
status = meta_ptr_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
|
||||
}
|
||||
|
||||
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
job->AddToIndexFiles(file_ptr, table_file);
|
||||
}
|
||||
job->WaitBuildIndexFinish();
|
||||
|
||||
// for (auto &file : to_index_files) {
|
||||
// status = BuildIndex(file);
|
||||
// if (!status.ok()) {
|
||||
// std::cout << "get to index file" << std::endl;
|
||||
//
|
||||
// scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
// job->AddToIndexFiles(file_ptr);
|
||||
//
|
||||
// if (!job->GetStatus().ok()) {
|
||||
// Status status = job->GetStatus();
|
||||
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
|
||||
// }
|
||||
//
|
||||
// if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// scheduler::JobMgrInst::GetInstance()->Put(job);
|
||||
// job->WaitBuildIndexFinish();
|
||||
|
||||
for (auto &file : to_index_files) {
|
||||
std::cout << "get to index file" << std::endl;
|
||||
status = BuildIndex(file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
|
||||
}
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ENGINE_LOG_TRACE << "Background build index thread exit";
|
||||
}
|
||||
|
|
|
@ -133,6 +133,7 @@ ExecutionEngineImpl::Serialize() {
|
|||
|
||||
Status
|
||||
ExecutionEngineImpl::Load(bool to_cache) {
|
||||
std::cout << "load" << std::endl;
|
||||
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
|
||||
bool already_in_cache = (index_ != nullptr);
|
||||
if (!already_in_cache) {
|
||||
|
@ -161,6 +162,7 @@ ExecutionEngineImpl::Load(bool to_cache) {
|
|||
|
||||
Status
|
||||
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
|
||||
std::cout << "copy2gpu" << std::endl;
|
||||
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
|
||||
bool already_in_cache = (index != nullptr);
|
||||
if (already_in_cache) {
|
||||
|
|
|
@ -69,7 +69,7 @@ TaskCreator::Create(const DeleteJobPtr &job) {
|
|||
}
|
||||
|
||||
std::vector<TaskPtr>
|
||||
TaskCreator::Create(const zilliz::milvus::scheduler::BuildIndexJobPtr &job) {
|
||||
TaskCreator::Create(const BuildIndexJobPtr &job) {
|
||||
std::vector<TaskPtr> tasks;
|
||||
//TODO(yukun): remove "disk" hardcode here
|
||||
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
|
||||
|
|
|
@ -172,7 +172,8 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
|||
Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);
|
||||
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->device_id() == build_index_gpu) {
|
||||
if (compute_resources[i]->name()
|
||||
== res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
|
||||
Path task_path(paths[i], paths[i].size() - 1);
|
||||
task->path() = task_path;
|
||||
break;
|
||||
|
|
|
@ -19,27 +19,24 @@
|
|||
#include "utils/Log.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
BuildIndexJob::BuildIndexJob(zilliz::milvus::scheduler::JobId id)
|
||||
: Job(id, JobType::BUILD){
|
||||
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr)
|
||||
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)) {
|
||||
|
||||
}
|
||||
|
||||
bool
|
||||
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file,
|
||||
const TableFileSchema table_file) {
|
||||
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
if (to_index_file == nullptr) {
|
||||
if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_;
|
||||
|
||||
to_index_files_[to_index_file->id_] = to_index_file;
|
||||
table_files_[table_file.id_] = table_file;
|
||||
}
|
||||
|
||||
Status&
|
||||
|
@ -58,6 +55,5 @@ BuildIndexJob::BuildIndexDone(size_t to_index_id) {
|
|||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -32,7 +32,6 @@
|
|||
#include "scheduler/Definition.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
|
@ -43,11 +42,11 @@ using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
|
|||
|
||||
class BuildIndexJob : public Job {
|
||||
public:
|
||||
explicit BuildIndexJob(JobId id);
|
||||
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr);
|
||||
|
||||
public:
|
||||
bool
|
||||
AddToIndexFiles(const TableFileSchemaPtr &to_index_file, const TableFileSchema table_file);
|
||||
AddToIndexFiles(const TableFileSchemaPtr &to_index_file);
|
||||
|
||||
Status &
|
||||
WaitBuildIndexFinish();
|
||||
|
@ -66,15 +65,26 @@ class BuildIndexJob : public Job {
|
|||
// return engine_type_;
|
||||
// }
|
||||
|
||||
Status &
|
||||
GetStatus() {
|
||||
return status_;
|
||||
}
|
||||
|
||||
Id2ToIndexMap &
|
||||
to_index_files() {
|
||||
return to_index_files_;
|
||||
}
|
||||
|
||||
engine::meta::MetaPtr
|
||||
meta() const {
|
||||
return meta_ptr_;
|
||||
}
|
||||
|
||||
private:
|
||||
Id2ToIndexMap to_index_files_;
|
||||
Id2ToTableFileMap table_files_;
|
||||
engine::meta::MetaPtr meta_ptr_;
|
||||
|
||||
Status status_;
|
||||
std::mutex mutex_;
|
||||
std::condition_variable cv_;
|
||||
};
|
||||
|
@ -83,4 +93,3 @@ using BuildIndexJobPtr = std::shared_ptr<BuildIndexJob>;
|
|||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,7 +26,6 @@
|
|||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
|
@ -39,7 +38,7 @@ XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file)
|
|||
}
|
||||
|
||||
void
|
||||
XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_id) {
|
||||
XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
|
||||
TimeRecorder rc("");
|
||||
Status stat = Status::OK();
|
||||
std::string error_msg;
|
||||
|
@ -50,7 +49,7 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i
|
|||
stat = to_index_engine_->Load();
|
||||
type_str = "DISK2CPU";
|
||||
} else if (type == LoadType::CPU2GPU) {
|
||||
stat = to_index_engine_->CopyToGpu(device_id);
|
||||
// stat = to_index_engine_->CopyToGpu(device_id);
|
||||
type_str = "CPU2GPU";
|
||||
} else if (type == LoadType::GPU2CPU) {
|
||||
stat = to_index_engine_->CopyToCpu();
|
||||
|
@ -90,8 +89,8 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i
|
|||
" bytes from location: " + file_->location_ + " totally cost";
|
||||
double span = rc.ElapseFromBegin(info);
|
||||
|
||||
// to_index_id_ = file_->id_;
|
||||
// to_index_type_ = file_->file_type_;
|
||||
to_index_id_ = file_->id_;
|
||||
to_index_type_ = file_->file_type_;
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -103,22 +102,106 @@ XBuildIndexTask::Execute() {
|
|||
TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_));
|
||||
|
||||
if (auto job = job_.lock()) {
|
||||
auto build_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
|
||||
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
|
||||
std::string location = file_->location_;
|
||||
EngineType engine_type = (EngineType)file_->engine_type_;
|
||||
std::shared_ptr<engine::ExecutionEngine> index;
|
||||
|
||||
// step 2: create table file
|
||||
engine::meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file_->table_id_;
|
||||
table_file.date_ = file_->date_;
|
||||
table_file.file_type_ =
|
||||
engine::meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path
|
||||
|
||||
engine::meta::MetaPtr meta_ptr = build_index_job->meta();
|
||||
Status status = build_index_job->meta()->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
|
||||
build_index_job->BuildIndexDone(to_index_id_);
|
||||
//TODO: return status
|
||||
}
|
||||
|
||||
// step 3: build index
|
||||
try {
|
||||
index = to_index_engine_->BuildIndex(location, engine_type);
|
||||
if (index == nullptr) {
|
||||
table_file_.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
|
||||
//TODO: updatetablefile
|
||||
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
|
||||
<< " to to_delete";
|
||||
|
||||
return;
|
||||
}
|
||||
} catch (std::exception &ex) {
|
||||
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
|
||||
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
|
||||
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
|
||||
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
|
||||
<< std::endl;
|
||||
|
||||
build_index_job->GetStatus() = Status(DB_ERROR, msg);
|
||||
return;
|
||||
}
|
||||
|
||||
build_job->BuildIndexDone(to_index_id_);
|
||||
// step 4: if table has been deleted, dont save index file
|
||||
bool has_table = false;
|
||||
meta_ptr->HasTable(file_->table_id_, has_table);
|
||||
if (!has_table) {
|
||||
meta_ptr->DeleteTableFiles(file_->table_id_);
|
||||
// return Status::OK();
|
||||
}
|
||||
|
||||
// step 5: save index file
|
||||
try {
|
||||
index->Serialize();
|
||||
} catch (std::exception &ex) {
|
||||
// typical error: out of disk space or permition denied
|
||||
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
|
||||
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
|
||||
std::cout << "ERROR: failed to persist index file: " << table_file.location_
|
||||
<< ", possible out of disk space" << std::endl;
|
||||
|
||||
// return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
// step 6: update meta
|
||||
table_file.file_type_ = engine::meta::TableFileSchema::INDEX;
|
||||
table_file.file_size_ = index->PhysicalSize();
|
||||
table_file.row_count_ = index->Count();
|
||||
|
||||
auto origin_file = *file_;
|
||||
origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP;
|
||||
|
||||
engine::meta::TableFilesSchema update_files = {table_file, origin_file};
|
||||
status = meta_ptr->UpdateTableFiles(update_files);
|
||||
if (status.ok()) {
|
||||
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
|
||||
<< " bytes"
|
||||
<< " from file " << origin_file.file_id_;
|
||||
|
||||
// index->Cache();
|
||||
} else {
|
||||
// failed to update meta, mark the new file as to_delete, don't delete old file
|
||||
origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX;
|
||||
status = meta_ptr->UpdateTableFile(origin_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
|
||||
|
||||
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
}
|
||||
|
||||
build_index_job->BuildIndexDone(to_index_id_);
|
||||
}
|
||||
|
||||
rc.ElapseFromBegin("totally cost");
|
||||
|
@ -128,4 +211,3 @@ XBuildIndexTask::Execute() {
|
|||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
} // namespace zilliz
|
||||
|
|
|
@ -22,7 +22,6 @@
|
|||
#include "scheduler/job/BuildIndexJob.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
|
@ -42,8 +41,8 @@ class XBuildIndexTask : public Task {
|
|||
size_t to_index_id_ = 0;
|
||||
int to_index_type_ = 0;
|
||||
ExecutionEnginePtr to_index_engine_ = nullptr;
|
||||
|
||||
};
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
} // namespace zilliz
|
||||
} // namespace milvus
|
Loading…
Reference in New Issue