From bc10937bcf953eede23d63e5887112903804e5b8 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 30 Sep 2019 19:16:56 +0800 Subject: [PATCH] buildindex to scheduler run ok Former-commit-id: 5d596c2cdc543d1093992d8a09594a1ca7d5d17a --- cpp/src/db/DBImpl.cpp | 64 ++++------- cpp/src/db/engine/ExecutionEngineImpl.cpp | 2 + cpp/src/scheduler/TaskCreator.cpp | 2 +- .../scheduler/action/PushTaskToNeighbour.cpp | 3 +- cpp/src/scheduler/job/BuildIndexJob.cpp | 12 +- cpp/src/scheduler/job/BuildIndexJob.h | 19 +++- cpp/src/scheduler/task/BuildIndexTask.cpp | 104 ++++++++++++++++-- cpp/src/scheduler/task/BuildIndexTask.h | 5 +- 8 files changed, 141 insertions(+), 70 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index ba43f945e8..22904bed2d 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -242,11 +242,7 @@ DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) { } Status -<<<<<<< HEAD -DBImpl::InsertVectors(const std::string &table_id_, uint64_t n, const float *vectors, IDNumbers &vector_ids_) { -======= DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vectors, IDNumbers& vector_ids) { ->>>>>>> upstream/branch-0.5.0 // ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache"; if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); @@ -299,17 +295,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { // for IDMAP type, only wait all NEW file converted to RAW file // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files std::vector file_types; -<<<<<<< HEAD - if (index.engine_type_ == (int) EngineType::FAISS_IDMAP) { - file_types = { - (int) meta::TableFileSchema::NEW, (int) meta::TableFileSchema::NEW_MERGE, - }; - } else { - file_types = { - (int) meta::TableFileSchema::RAW, (int) meta::TableFileSchema::NEW, - (int) meta::TableFileSchema::NEW_MERGE, (int) meta::TableFileSchema::NEW_INDEX, - (int) meta::TableFileSchema::TO_INDEX, -======= if (index.engine_type_ == static_cast(EngineType::FAISS_IDMAP)) { file_types = { static_cast(meta::TableFileSchema::NEW), static_cast(meta::TableFileSchema::NEW_MERGE), @@ -321,7 +306,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { static_cast(meta::TableFileSchema::NEW_MERGE), static_cast(meta::TableFileSchema::NEW_INDEX), static_cast(meta::TableFileSchema::TO_INDEX), ->>>>>>> upstream/branch-0.5.0 }; } @@ -915,38 +899,36 @@ DBImpl::BackgroundBuildIndex() { Status status; scheduler::BuildIndexJobPtr - job = std::make_shared(0); + job = std::make_shared(0, meta_ptr_); // step 2: put build index task to scheduler - scheduler::JobMgrInst::GetInstance()->Put(job); - for (auto &file : to_index_files) { - std::cout << "get to index file" << std::endl; - meta::TableFileSchema table_file; - table_file.table_id_ = file.table_id_; - table_file.date_ = file.date_; - table_file.file_type_ = - meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path - status = meta_ptr_->CreateTableFile(table_file); - if (!status.ok()) { - ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); - } - - scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); - job->AddToIndexFiles(file_ptr, table_file); - } - job->WaitBuildIndexFinish(); - // for (auto &file : to_index_files) { -// status = BuildIndex(file); -// if (!status.ok()) { +// std::cout << "get to index file" << std::endl; +// +// scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); +// job->AddToIndexFiles(file_ptr); +// +// if (!job->GetStatus().ok()) { +// Status status = job->GetStatus(); // ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); // } // -// if (shutting_down_.load(std::memory_order_acquire)) { -// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; -// break; -// } // } +// scheduler::JobMgrInst::GetInstance()->Put(job); +// job->WaitBuildIndexFinish(); + + for (auto &file : to_index_files) { + std::cout << "get to index file" << std::endl; + status = BuildIndex(file); + if (!status.ok()) { + ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); + } + + if (shutting_down_.load(std::memory_order_acquire)) { + ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; + break; + } + } ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index 52365d0915..f9366e439a 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -133,6 +133,7 @@ ExecutionEngineImpl::Serialize() { Status ExecutionEngineImpl::Load(bool to_cache) { + std::cout << "load" << std::endl; index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { @@ -161,6 +162,7 @@ ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { + std::cout << "copy2gpu" << std::endl; auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index 3acb28591b..0a7b3f9cbb 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -69,7 +69,7 @@ TaskCreator::Create(const DeleteJobPtr &job) { } std::vector -TaskCreator::Create(const zilliz::milvus::scheduler::BuildIndexJobPtr &job) { +TaskCreator::Create(const BuildIndexJobPtr &job) { std::vector tasks; //TODO(yukun): remove "disk" hardcode here ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk"); diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 4ee6570012..127e01232c 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -172,7 +172,8 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->device_id() == build_index_gpu) { + if (compute_resources[i]->name() + == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { Path task_path(paths[i], paths[i].size() - 1); task->path() = task_path; break; diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp index 4c91afcca0..9ab3650dba 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.cpp +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -19,27 +19,24 @@ #include "utils/Log.h" -namespace zilliz { namespace milvus { namespace scheduler { -BuildIndexJob::BuildIndexJob(zilliz::milvus::scheduler::JobId id) - : Job(id, JobType::BUILD){ +BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr) + : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)) { } bool -BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file, - const TableFileSchema table_file) { +BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file) { std::unique_lock lock(mutex_); - if (to_index_file == nullptr) { + if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) { return false; } SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_; to_index_files_[to_index_file->id_] = to_index_file; - table_files_[table_file.id_] = table_file; } Status& @@ -58,6 +55,5 @@ BuildIndexJob::BuildIndexDone(size_t to_index_id) { } -} } } \ No newline at end of file diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h index 4d52461348..e536012985 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.h +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -32,7 +32,6 @@ #include "scheduler/Definition.h" -namespace zilliz { namespace milvus { namespace scheduler { @@ -43,11 +42,11 @@ using Id2ToTableFileMap = std::unordered_map; class BuildIndexJob : public Job { public: - explicit BuildIndexJob(JobId id); + explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr); public: bool - AddToIndexFiles(const TableFileSchemaPtr &to_index_file, const TableFileSchema table_file); + AddToIndexFiles(const TableFileSchemaPtr &to_index_file); Status & WaitBuildIndexFinish(); @@ -66,15 +65,26 @@ class BuildIndexJob : public Job { // return engine_type_; // } + Status & + GetStatus() { + return status_; + } + Id2ToIndexMap & to_index_files() { return to_index_files_; } + engine::meta::MetaPtr + meta() const { + return meta_ptr_; + } + private: Id2ToIndexMap to_index_files_; - Id2ToTableFileMap table_files_; + engine::meta::MetaPtr meta_ptr_; + Status status_; std::mutex mutex_; std::condition_variable cv_; }; @@ -83,4 +93,3 @@ using BuildIndexJobPtr = std::shared_ptr; } } -} \ No newline at end of file diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index d6b063d713..ea775982ee 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -26,7 +26,6 @@ #include #include -namespace zilliz { namespace milvus { namespace scheduler { @@ -39,7 +38,7 @@ XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file) } void -XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_id) { +XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { TimeRecorder rc(""); Status stat = Status::OK(); std::string error_msg; @@ -50,7 +49,7 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i stat = to_index_engine_->Load(); type_str = "DISK2CPU"; } else if (type == LoadType::CPU2GPU) { - stat = to_index_engine_->CopyToGpu(device_id); +// stat = to_index_engine_->CopyToGpu(device_id); type_str = "CPU2GPU"; } else if (type == LoadType::GPU2CPU) { stat = to_index_engine_->CopyToCpu(); @@ -90,8 +89,8 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i " bytes from location: " + file_->location_ + " totally cost"; double span = rc.ElapseFromBegin(info); -// to_index_id_ = file_->id_; -// to_index_type_ = file_->file_type_; + to_index_id_ = file_->id_; + to_index_type_ = file_->file_type_; } void @@ -103,22 +102,106 @@ XBuildIndexTask::Execute() { TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_)); if (auto job = job_.lock()) { - auto build_job = std::static_pointer_cast(job); + auto build_index_job = std::static_pointer_cast(job); std::string location = file_->location_; EngineType engine_type = (EngineType)file_->engine_type_; std::shared_ptr index; + // step 2: create table file + engine::meta::TableFileSchema table_file; + table_file.table_id_ = file_->table_id_; + table_file.date_ = file_->date_; + table_file.file_type_ = + engine::meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path + + engine::meta::MetaPtr meta_ptr = build_index_job->meta(); + Status status = build_index_job->meta()->CreateTableFile(table_file); + if (!status.ok()) { + ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); + build_index_job->BuildIndexDone(to_index_id_); + //TODO: return status + } + + // step 3: build index try { index = to_index_engine_->BuildIndex(location, engine_type); if (index == nullptr) { - table_file_.file_type_ = engine::meta::TableFileSchema::TO_DELETE; - //TODO: updatetablefile + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ + << " to to_delete"; + + return; } } catch (std::exception &ex) { - ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); + std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); + ENGINE_LOG_ERROR << msg; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + + std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" + << std::endl; + + build_index_job->GetStatus() = Status(DB_ERROR, msg); + return; } - build_job->BuildIndexDone(to_index_id_); + // step 4: if table has been deleted, dont save index file + bool has_table = false; + meta_ptr->HasTable(file_->table_id_, has_table); + if (!has_table) { + meta_ptr->DeleteTableFiles(file_->table_id_); +// return Status::OK(); + } + + // step 5: save index file + try { + index->Serialize(); + } catch (std::exception &ex) { + // typical error: out of disk space or permition denied + std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); + ENGINE_LOG_ERROR << msg; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + + std::cout << "ERROR: failed to persist index file: " << table_file.location_ + << ", possible out of disk space" << std::endl; + +// return Status(DB_ERROR, msg); + } + + // step 6: update meta + table_file.file_type_ = engine::meta::TableFileSchema::INDEX; + table_file.file_size_ = index->PhysicalSize(); + table_file.row_count_ = index->Count(); + + auto origin_file = *file_; + origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP; + + engine::meta::TableFilesSchema update_files = {table_file, origin_file}; + status = meta_ptr->UpdateTableFiles(update_files); + if (status.ok()) { + ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize() + << " bytes" + << " from file " << origin_file.file_id_; + +// index->Cache(); + } else { + // failed to update meta, mark the new file as to_delete, don't delete old file + origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX; + status = meta_ptr->UpdateTableFile(origin_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index"; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_ << " to to_delete"; + } + + build_index_job->BuildIndexDone(to_index_id_); } rc.ElapseFromBegin("totally cost"); @@ -128,4 +211,3 @@ XBuildIndexTask::Execute() { } // namespace scheduler } // namespace milvus -} // namespace zilliz diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h index 42606e15a7..cd751270a0 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.h +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -22,7 +22,6 @@ #include "scheduler/job/BuildIndexJob.h" -namespace zilliz { namespace milvus { namespace scheduler { @@ -42,8 +41,8 @@ class XBuildIndexTask : public Task { size_t to_index_id_ = 0; int to_index_type_ = 0; ExecutionEnginePtr to_index_engine_ = nullptr; + }; } // namespace scheduler -} // namespace milvus -} // namespace zilliz +} // namespace milvus \ No newline at end of file