From b3608e1ae736cbe2e5935d4a52ac973e369a5007 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 22 Jul 2020 14:11:28 +0800 Subject: [PATCH] snapshot scheduler (#2961) * update interface Signed-off-by: yudong.cai * add test_ss_job Signed-off-by: yudong.cai * update Query Signed-off-by: yudong.cai * add dir_root for SSSearchJob and SSBuildIndexJob Signed-off-by: yudong.cai --- core/src/db/SSDBImpl.cpp | 120 ++++------ core/src/db/SSDBImpl.h | 10 +- core/src/db/Types.h | 2 + core/src/scheduler/TaskCreator.cpp | 4 +- core/src/scheduler/job/SSBuildIndexJob.cpp | 12 +- core/src/scheduler/job/SSBuildIndexJob.h | 52 ++--- core/src/scheduler/job/SSSearchJob.cpp | 55 +---- core/src/scheduler/job/SSSearchJob.h | 113 +++------- core/src/scheduler/task/SSBuildIndexTask.cpp | 177 ++------------- core/src/scheduler/task/SSBuildIndexTask.h | 12 +- core/src/scheduler/task/SSSearchTask.cpp | 218 +++++-------------- core/src/scheduler/task/SSSearchTask.h | 22 +- core/src/scheduler/task/SSTestTask.cpp | 2 +- core/unittest/ssdb/test_db.cpp | 52 ----- core/unittest/ssdb/test_ss_job.cpp | 111 ++++++++-- core/unittest/ssdb/test_ss_task.cpp | 30 +-- core/unittest/ssdb/utils.cpp | 63 ++++-- core/unittest/ssdb/utils.h | 11 + 18 files changed, 359 insertions(+), 707 deletions(-) diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index a727aabb5e..0658d003e0 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -496,8 +496,7 @@ SSDBImpl::Flush() { } Status -SSDBImpl::Compact(const std::shared_ptr& context, const std::string& collection_name, - double threshold) { +SSDBImpl::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) { if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -589,7 +588,7 @@ SSDBImpl::GetEntityIDs(const std::string& collection_id, int64_t segment_id, IDN } Status -SSDBImpl::CreateIndex(const std::shared_ptr& context, const std::string& collection_id, +SSDBImpl::CreateIndex(const server::ContextPtr& context, const std::string& collection_id, const std::string& field_name, const CollectionIndex& index) { return Status::OK(); } @@ -627,95 +626,50 @@ SSDBImpl::DropIndex(const std::string& collection_id) { } Status -SSDBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResult& result) { +SSDBImpl::Query(const server::ContextPtr& context, const std::string& collection_name, const query::QueryPtr& query_ptr, + engine::QueryResultPtr& result) { CHECK_INITIALIZED; - milvus::server::ContextChild tracer(context, "Query"); - TimeRecorder rc("SSDBImpl::Query"); - // snapshot::ScopedSnapshotT ss; - // STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); - // - // /* collect all valid segment */ - // std::vector segment_visitors; - // auto exec = [&] (const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status { - // auto p_id = segment->GetPartitionId(); - // auto p_ptr = ss->GetResource(p_id); - // auto& p_name = p_ptr->GetName(); - // - // /* check partition match pattern */ - // bool match = false; - // if (partition_patterns.empty()) { - // match = true; - // } else { - // for (auto &pattern : partition_patterns) { - // if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) { - // match = true; - // break; - // } - // } - // } - // - // if (match) { - // auto visitor = SegmentVisitor::Build(ss, segment->GetID()); - // if (!visitor) { - // return Status(milvus::SS_ERROR, "Cannot build segment visitor"); - // } - // segment_visitors.push_back(visitor); - // } - // return Status::OK(); - // }; - // - // auto segment_iter = std::make_shared(ss, exec); - // segment_iter->Iterate(); - // STATUS_CHECK(segment_iter->GetStatus()); - // - // LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size()); - // - // VectorsData vectors; - // scheduler::SSSearchJobPtr job = - // std::make_shared(tracer.Context(), general_query, query_ptr, attr_type, vectors); - // for (auto& sv : segment_visitors) { - // job->AddSegmentVisitor(sv); - // } - // - // // step 2: put search job to scheduler and wait result - // scheduler::JobMgrInst::GetInstance()->Put(job); - // job->WaitResult(); - // - // if (!job->GetStatus().ok()) { - // return job->GetStatus(); - // } - // - // // step 3: construct results - // result.row_num_ = job->vector_count(); - // result.result_ids_ = job->GetResultIds(); - // result.result_distances_ = job->GetResultDistances(); + snapshot::ScopedSnapshotT ss; + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); - // step 4: get entities by result ids - // STATUS_CHECK(GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_)); + /* collect all segment visitors */ + std::vector segment_visitors; + auto exec = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* handler) -> Status { + auto visitor = SegmentVisitor::Build(ss, segment->GetID()); + if (!visitor) { + return Status(milvus::SS_ERROR, "Cannot build segment visitor"); + } + segment_visitors.push_back(visitor); + return Status::OK(); + }; - // step 5: filter entities by field names - // std::vector filter_attrs; - // for (auto attr : result.attrs_) { - // AttrsData attrs_data; - // attrs_data.attr_type_ = attr.attr_type_; - // attrs_data.attr_count_ = attr.attr_count_; - // attrs_data.id_array_ = attr.id_array_; - // for (auto& name : field_names) { - // if (attr.attr_data_.find(name) != attr.attr_data_.end()) { - // attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name))); - // } - // } - // filter_attrs.emplace_back(attrs_data); - // } + auto segment_iter = std::make_shared(ss, exec); + segment_iter->Iterate(); + STATUS_CHECK(segment_iter->GetStatus()); + + LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size()); + + scheduler::SSSearchJobPtr job = std::make_shared(nullptr, options_.meta_.path_, query_ptr); + for (auto& sv : segment_visitors) { + job->AddSegmentVisitor(sv); + } + + /* put search job to scheduler and wait job finish */ + scheduler::JobMgrInst::GetInstance()->Put(job); + job->WaitFinish(); + + if (!job->status().ok()) { + return job->status(); + } + + result = job->query_result(); rc.ElapseFromBegin("Engine query totally cost"); - // tracer.Context()->GetTraceContext()->GetSpan()->Finish(); - - return Status::OK(); + return job->status(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index 2b348f4fbc..e73ab7b20d 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -94,8 +94,7 @@ class SSDBImpl { Flush(); Status - Compact(const std::shared_ptr& context, const std::string& collection_name, - double threshold = 0.0); + Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold = 0.0); Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -105,8 +104,8 @@ class SSDBImpl { GetEntityIDs(const std::string& collection_id, int64_t segment_id, IDNumbers& entity_ids); Status - CreateIndex(const std::shared_ptr& context, const std::string& collection_id, - const std::string& field_name, const CollectionIndex& index); + CreateIndex(const server::ContextPtr& context, const std::string& collection_id, const std::string& field_name, + const CollectionIndex& index); Status DescribeIndex(const std::string& collection_id, const std::string& field_name, CollectionIndex& index); @@ -118,7 +117,8 @@ class SSDBImpl { DropIndex(const std::string& collection_id); Status - Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResult& result); + Query(const server::ContextPtr& context, const std::string& collection_name, const query::QueryPtr& query_ptr, + engine::QueryResultPtr& result); private: void diff --git a/core/src/db/Types.h b/core/src/db/Types.h index e30563e2fd..ffe14ef592 100644 --- a/core/src/db/Types.h +++ b/core/src/db/Types.h @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -73,6 +74,7 @@ struct QueryResult { std::vector vectors_; std::vector attrs_; }; +using QueryResultPtr = std::shared_ptr; using File2ErrArray = std::map>; using Table2FileErr = std::map; diff --git a/core/src/scheduler/TaskCreator.cpp b/core/src/scheduler/TaskCreator.cpp index 5349056706..ef0c273854 100644 --- a/core/src/scheduler/TaskCreator.cpp +++ b/core/src/scheduler/TaskCreator.cpp @@ -85,7 +85,7 @@ std::vector TaskCreator::Create(const SSSearchJobPtr& job) { std::vector tasks; for (auto& sv : job->segment_visitor_map()) { - auto task = std::make_shared(job->GetContext(), sv.second, nullptr); + auto task = std::make_shared(job->GetContext(), job->dir_root(), sv.second, nullptr); task->job_ = job; tasks.emplace_back(task); } @@ -96,7 +96,7 @@ std::vector TaskCreator::Create(const SSBuildIndexJobPtr& job) { std::vector tasks; for (auto& sv : job->segment_visitor_map()) { - auto task = std::make_shared(sv.second, nullptr); + auto task = std::make_shared(job->dir_root(), sv.second, nullptr); task->job_ = job; tasks.emplace_back(task); } diff --git a/core/src/scheduler/job/SSBuildIndexJob.cpp b/core/src/scheduler/job/SSBuildIndexJob.cpp index aed18fd9bc..ef24d7880d 100644 --- a/core/src/scheduler/job/SSBuildIndexJob.cpp +++ b/core/src/scheduler/job/SSBuildIndexJob.cpp @@ -18,7 +18,7 @@ namespace milvus { namespace scheduler { -SSBuildIndexJob::SSBuildIndexJob(engine::DBOptions options) : Job(JobType::SS_BUILD), options_(std::move(options)) { +SSBuildIndexJob::SSBuildIndexJob(const std::string& dir_root) : Job(JobType::SS_BUILD), dir_root_(dir_root) { SetIdentity("SSBuildIndexJob"); AddCacheInsertDataListener(); } @@ -31,7 +31,7 @@ SSBuildIndexJob::AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor) { } void -SSBuildIndexJob::WaitBuildIndexFinish() { +SSBuildIndexJob::WaitFinish() { std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return segment_visitor_map_.empty(); }); LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] BuildIndexJob %ld all done", "build index", 0, id()); @@ -55,10 +55,10 @@ SSBuildIndexJob::Dump() const { return ret; } -void -SSBuildIndexJob::OnCacheInsertDataChanged(bool value) { - options_.insert_cache_immediately_ = value; -} +// void +// SSBuildIndexJob::OnCacheInsertDataChanged(bool value) { +// options_.insert_cache_immediately_ = value; +//} } // namespace scheduler } // namespace milvus diff --git a/core/src/scheduler/job/SSBuildIndexJob.h b/core/src/scheduler/job/SSBuildIndexJob.h index 6c84d0f750..9beffdcc51 100644 --- a/core/src/scheduler/job/SSBuildIndexJob.h +++ b/core/src/scheduler/job/SSBuildIndexJob.h @@ -30,26 +30,18 @@ namespace milvus { namespace scheduler { -// using engine::meta::SegmentSchemaPtr; - -// using Id2ToIndexMap = std::unordered_map; -// using Id2ToTableFileMap = std::unordered_map; - class SSBuildIndexJob : public Job, public server::CacheConfigHandler { public: - explicit SSBuildIndexJob(engine::DBOptions options); + explicit SSBuildIndexJob(const std::string& dir_root); ~SSBuildIndexJob() = default; public: - // bool - // AddToIndexFiles(const SegmentSchemaPtr& to_index_file); - void AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor); void - WaitBuildIndexFinish(); + WaitFinish(); void BuildIndexDone(const engine::snapshot::ID_TYPE seg_id); @@ -58,39 +50,33 @@ class SSBuildIndexJob : public Job, public server::CacheConfigHandler { Dump() const override; public: - Status& - GetStatus() { - return status_; + const std::string& + dir_root() const { + return dir_root_; } - // Id2ToIndexMap& - // to_index_files() { - // return to_index_files_; - // } - - // engine::meta::MetaPtr - // meta() const { - // return meta_ptr_; - // } - const SegmentVisitorMap& - segment_visitor_map() { + segment_visitor_map() const { return segment_visitor_map_; } - engine::DBOptions - options() const { - return options_; + Status& + status() { + return status_; } - protected: - void - OnCacheInsertDataChanged(bool value) override; + // engine::DBOptions + // options() const { + // return options_; + // } + + // protected: + // void + // OnCacheInsertDataChanged(bool value) override; private: - // Id2ToIndexMap to_index_files_; - // engine::meta::MetaPtr meta_ptr_; - engine::DBOptions options_; + // engine::DBOptions options_; + std::string dir_root_; SegmentVisitorMap segment_visitor_map_; Status status_; diff --git a/core/src/scheduler/job/SSSearchJob.cpp b/core/src/scheduler/job/SSSearchJob.cpp index 2f841bc14b..7755fb7b25 100644 --- a/core/src/scheduler/job/SSSearchJob.cpp +++ b/core/src/scheduler/job/SSSearchJob.cpp @@ -10,27 +10,14 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "scheduler/job/SSSearchJob.h" - #include "utils/Log.h" namespace milvus { namespace scheduler { -SSSearchJob::SSSearchJob(const server::ContextPtr& context, int64_t topk, const milvus::json& extra_params, - engine::VectorsData& vectors) - : Job(JobType::SS_SEARCH), context_(context), topk_(topk), extra_params_(extra_params), vectors_(vectors) { -} - -SSSearchJob::SSSearchJob(const server::ContextPtr& context, milvus::query::GeneralQueryPtr general_query, - query::QueryPtr query_ptr, - std::unordered_map& attr_type, - engine::VectorsData& vectors) - : Job(JobType::SS_SEARCH), - context_(context), - general_query_(general_query), - query_ptr_(query_ptr), - attr_type_(attr_type), - vectors_(vectors) { +SSSearchJob::SSSearchJob(const server::ContextPtr& context, const std::string& dir_root, + const query::QueryPtr& query_ptr) + : Job(JobType::SS_SEARCH), context_(context), dir_root_(dir_root), query_ptr_(query_ptr) { } void @@ -41,13 +28,13 @@ SSSearchJob::AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor) { } void -SSSearchJob::WaitResult() { +SSSearchJob::WaitFinish() { std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return segment_visitor_map_.empty(); }); - // LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld: query_time %f, map_uids_time %f, reduce_time %f", - // "search", 0, - // id(), this->time_stat().query_time, this->time_stat().map_uids_time, - // this->time_stat().reduce_time); + // LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld: query_time %f, map_uids_time %f, reduce_time %f", + // "search", 0, + // id(), this->time_stat().query_time, this->time_stat().map_uids_time, + // this->time_stat().reduce_time); LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld all done", "search", 0, id()); } @@ -61,37 +48,13 @@ SSSearchJob::SearchDone(const engine::snapshot::ID_TYPE seg_id) { LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld finish segment: %ld", "search", 0, id(), seg_id); } -ResultIds& -SSSearchJob::GetResultIds() { - return result_ids_; -} - -ResultDistances& -SSSearchJob::GetResultDistances() { - return result_distances_; -} - -Status& -SSSearchJob::GetStatus() { - return status_; -} - json SSSearchJob::Dump() const { - json ret{ - {"topk", topk_}, - {"nq", vectors_.vector_count_}, - {"extra_params", extra_params_.dump()}, - }; + json ret{{"extra_params", extra_params_.dump()}}; auto base = Job::Dump(); ret.insert(base.begin(), base.end()); return ret; } -const std::shared_ptr& -SSSearchJob::GetContext() const { - return context_; -} - } // namespace scheduler } // namespace milvus diff --git a/core/src/scheduler/job/SSSearchJob.h b/core/src/scheduler/job/SSSearchJob.h index 9889b1b32d..764a9dd35f 100644 --- a/core/src/scheduler/job/SSSearchJob.h +++ b/core/src/scheduler/job/SSSearchJob.h @@ -25,21 +25,13 @@ #include "Job.h" #include "db/SnapshotVisitor.h" #include "db/Types.h" -#include "db/meta/MetaTypes.h" -#include "query/GeneralQuery.h" +//#include "db/meta/MetaTypes.h" #include "server/context/Context.h" namespace milvus { namespace scheduler { -using engine::meta::SegmentSchemaPtr; - -using Id2IndexMap = std::unordered_map; - -using ResultIds = engine::ResultIds; -using ResultDistances = engine::ResultDistances; - // struct SearchTimeStat { // double query_time = 0.0; // double map_uids_time = 0.0; @@ -48,52 +40,25 @@ using ResultDistances = engine::ResultDistances; class SSSearchJob : public Job { public: - SSSearchJob(const server::ContextPtr& context, int64_t topk, const milvus::json& extra_params, - engine::VectorsData& vectors); - - SSSearchJob(const server::ContextPtr& context, query::GeneralQueryPtr general_query, query::QueryPtr query_ptr, - std::unordered_map& attr_type, - engine::VectorsData& vectorsData); + SSSearchJob(const server::ContextPtr& context, const std::string& dir_root, const query::QueryPtr& query_ptr); public: void AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor); void - WaitResult(); + WaitFinish(); void SearchDone(const engine::snapshot::ID_TYPE seg_id); - ResultIds& - GetResultIds(); - - ResultDistances& - GetResultDistances(); - - void - SetVectors(engine::VectorsData& vectors) { - vectors_ = vectors; - } - - Status& - GetStatus(); - json Dump() const override; public: const server::ContextPtr& - GetContext() const; - - int64_t - topk() { - return topk_; - } - - int64_t - nq() const { - return vectors_.vector_count_; + GetContext() const { + return context_; } const milvus::json& @@ -101,70 +66,58 @@ class SSSearchJob : public Job { return extra_params_; } - const engine::VectorsData& - vectors() const { - return vectors_; + const std::string& + dir_root() const { + return dir_root_; } const SegmentVisitorMap& - segment_visitor_map() { + segment_visitor_map() const { return segment_visitor_map_; } + const query::QueryPtr + query_ptr() const { + return query_ptr_; + } + + engine::QueryResultPtr& + query_result() { + return query_result_; + } + + Status& + status() { + return status_; + } + std::mutex& mutex() { return mutex_; } - query::GeneralQueryPtr - general_query() { - return general_query_; - } - - query::QueryPtr - query_ptr() { - return query_ptr_; - } - - std::unordered_map& - attr_type() { - return attr_type_; - } - - int64_t - vector_count() { - return vector_count_; - } - - // SearchTimeStat& - // time_stat() { - // return time_stat_; - // } + // SearchTimeStat& + // time_stat() { + // return time_stat_; + // } private: const server::ContextPtr context_; - int64_t topk_ = 0; milvus::json extra_params_; - // TODO: smart pointer - engine::VectorsData& vectors_; + std::string dir_root_; SegmentVisitorMap segment_visitor_map_; - // TODO: column-base better ? - ResultIds result_ids_; - ResultDistances result_distances_; - Status status_; - - query::GeneralQueryPtr general_query_; query::QueryPtr query_ptr_; - std::unordered_map attr_type_; - int64_t vector_count_; + + engine::QueryResultPtr query_result_; + Status status_; std::mutex mutex_; std::condition_variable cv_; - // SearchTimeStat time_stat_; + // SearchTimeStat time_stat_; }; using SSSearchJobPtr = std::shared_ptr; diff --git a/core/src/scheduler/task/SSBuildIndexTask.cpp b/core/src/scheduler/task/SSBuildIndexTask.cpp index 6c0f8ef9ba..46608ed145 100644 --- a/core/src/scheduler/task/SSBuildIndexTask.cpp +++ b/core/src/scheduler/task/SSBuildIndexTask.cpp @@ -9,62 +9,43 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -#include "scheduler/task/SSBuildIndexTask.h" - #include - #include -#include -#include #include #include "db/Utils.h" -#include "db/engine/EngineFactory.h" -#include "metrics/Metrics.h" +#include "db/engine/SSExecutionEngineImpl.h" #include "scheduler/job/SSBuildIndexJob.h" -#include "utils/CommonUtil.h" -#include "utils/Exception.h" +#include "scheduler/task/SSBuildIndexTask.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" namespace milvus { namespace scheduler { -XSSBuildIndexTask::XSSBuildIndexTask(const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label) +XSSBuildIndexTask::XSSBuildIndexTask(const std::string& dir_root, const engine::SegmentVisitorPtr& visitor, + TaskLabelPtr label) : Task(TaskType::BuildIndexTask, std::move(label)), visitor_(visitor) { - // if (file_) { - // EngineType engine_type; - // if (file->file_type_ == SegmentSchema::FILE_TYPE::RAW || - // file->file_type_ == SegmentSchema::FILE_TYPE::TO_INDEX || - // file->file_type_ == SegmentSchema::FILE_TYPE::BACKUP) { - // engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? EngineType::FAISS_BIN_IDMAP - // : EngineType::FAISS_IDMAP; - // } else { - // engine_type = (EngineType)file->engine_type_; - // } - // - // auto json = milvus::json::parse(file_->index_params_); - // to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, engine_type, - // (MetricType)file_->metric_type_, json); - // } + engine_ = std::make_shared(dir_root, visitor); } void XSSBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { TimeRecorder rc("XSSBuildIndexTask::Load"); + auto seg_id = visitor_->GetSegment()->GetID(); Status stat = Status::OK(); std::string error_msg; std::string type_str; if (auto job = job_.lock()) { auto build_index_job = std::static_pointer_cast(job); - auto options = build_index_job->options(); + // auto options = build_index_job->options(); try { if (type == LoadType::DISK2CPU) { - stat = to_index_engine_->Load(options.insert_cache_immediately_); + stat = engine_->Load(nullptr); type_str = "DISK2CPU"; } else if (type == LoadType::CPU2GPU) { - stat = to_index_engine_->CopyToIndexFileToGpu(device_id); + stat = engine_->CopyToGpu(device_id); type_str = "CPU2GPU:" + std::to_string(device_id); } else { error_msg = "Wrong load type"; @@ -92,23 +73,16 @@ XSSBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { if (auto job = job_.lock()) { auto build_index_job = std::static_pointer_cast(job); - build_index_job->GetStatus() = s; - build_index_job->BuildIndexDone(visitor_->GetSegment()->GetID()); + build_index_job->status() = s; + build_index_job->BuildIndexDone(seg_id); } return; } - // size_t file_size = to_index_engine_->Size(); - // - // std::string info = "Build index task load file id:" + std::to_string(file_->id_) + " " + type_str + - // " file type:" + std::to_string(file_->file_type_) + " size:" + - // std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally - // cost"; - // rc.ElapseFromBegin(info); - // - // to_index_id_ = file_->id_; - // to_index_type_ = file_->file_type_; + std::string info = + "Build index task load segment id:" + std::to_string(seg_id) + " " + type_str + " totally cost"; + rc.ElapseFromBegin(info); } } @@ -119,129 +93,18 @@ XSSBuildIndexTask::Execute() { if (auto job = job_.lock()) { auto build_index_job = std::static_pointer_cast(job); - if (to_index_engine_ == nullptr) { + if (engine_ == nullptr) { build_index_job->BuildIndexDone(seg_id); - build_index_job->GetStatus() = Status(DB_ERROR, "source index is null"); + build_index_job->status() = Status(DB_ERROR, "source index is null"); return; } - // std::string location = file_->location_; - // std::shared_ptr index; - // - // // step 1: create collection file - // engine::meta::SegmentSchema table_file; - // table_file.collection_id_ = file_->collection_id_; - // table_file.segment_id_ = file_->file_id_; - // table_file.date_ = file_->date_; - // table_file.file_type_ = engine::meta::SegmentSchema::NEW_INDEX; - // - // engine::meta::MetaPtr meta_ptr = build_index_job->meta(); - // Status status = meta_ptr->CreateCollectionFile(table_file); - // - // fiu_do_on("XSSBuildIndexTask.Execute.create_table_success", status = Status::OK()); - // if (!status.ok()) { - // LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.ToString(); - // build_index_job->BuildIndexDone(to_index_id_); - // build_index_job->GetStatus() = status; - // to_index_engine_ = nullptr; - // return; - // } - // - // auto failed_build_index = [&](std::string log_msg, std::string err_msg) { - // table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE; - // status = meta_ptr->UpdateCollectionFile(table_file); - // LOG_ENGINE_ERROR_ << log_msg; - // - // build_index_job->BuildIndexDone(to_index_id_); - // build_index_job->GetStatus() = Status(DB_ERROR, err_msg); - // to_index_engine_ = nullptr; - // }; - // - // // step 2: build index - // try { - // LOG_ENGINE_DEBUG_ << "Begin build index for file:" + table_file.location_; - // index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_); - // fiu_do_on("XSSBuildIndexTask.Execute.build_index_fail", index = nullptr); - // if (index == nullptr) { - // std::string log_msg = "Failed to build index " + table_file.file_id_ + ", reason: source index - // is null"; failed_build_index(log_msg, "source index is null"); return; - // } - // } catch (std::exception& ex) { - // std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: " + - // std::string(ex.what()); failed_build_index(msg, ex.what()); return; - // } - // - // // step 3: if collection has been deleted, dont save index file - // bool has_collection = false; - // meta_ptr->HasCollection(file_->collection_id_, has_collection); - // fiu_do_on("XSSBuildIndexTask.Execute.has_collection", has_collection = true); - // - // if (!has_collection) { - // std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: collection has been - // deleted"; failed_build_index(msg, "Collection has been deleted"); return; - // } - // - // // step 4: save index file - // try { - // fiu_do_on("XSSBuildIndexTask.Execute.throw_std_exception", throw std::exception()); - // status = index->Serialize(); - // if (!status.ok()) { - // std::string msg = - // "Failed to persist index file: " + table_file.location_ + ", reason: " + status.message(); - // failed_build_index(msg, status.message()); - // return; - // } - // } catch (std::exception& ex) { - // // if failed to serialize index file to disk - // // typical error: out of disk space, out of memory or permition denied - // std::string msg = - // "Failed to persist index file:" + table_file.location_ + ", exception:" + - // std::string(ex.what()); - // failed_build_index(msg, ex.what()); - // return; - // } - // - // // step 5: update meta - // table_file.file_type_ = engine::meta::SegmentSchema::INDEX; - // table_file.file_size_ = CommonUtil::GetFileSize(table_file.location_); - // table_file.row_count_ = file_->row_count_; // index->Count(); - // - // auto origin_file = *file_; - // origin_file.file_type_ = engine::meta::SegmentSchema::BACKUP; - // - // engine::meta::SegmentsSchema update_files = {table_file, origin_file}; - // - // if (status.ok()) { // makesure index file is sucessfully serialized to disk - // status = meta_ptr->UpdateCollectionFiles(update_files); - // } - // - // fiu_do_on("XSSBuildIndexTask.Execute.update_table_file_fail", status = Status(SERVER_UNEXPECTED_ERROR, - // "")); if (status.ok()) { - // LOG_ENGINE_DEBUG_ << "New index file " << table_file.file_id_ << " of size " << - // table_file.file_size_ - // << " bytes" - // << " from file " << origin_file.file_id_; - // // XXX_Index_NM doesn't support it now. - // // if (build_index_job->options().insert_cache_immediately_) { - // // index->Cache(); - // // } - // } else { - // // failed to update meta, mark the new file as to_delete, don't delete old file - // origin_file.file_type_ = engine::meta::SegmentSchema::TO_INDEX; - // status = meta_ptr->UpdateCollectionFile(origin_file); - // LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << origin_file.file_id_ - // << " to to_index"; - // - // table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE; - // status = meta_ptr->UpdateCollectionFile(table_file); - // LOG_ENGINE_DEBUG_ << "Failed to up date file to index, mark file: " << table_file.file_id_ - // << " to to_delete"; - // } - // - // build_index_job->BuildIndexDone(to_index_id_); + // SS TODO + + build_index_job->BuildIndexDone(seg_id); } - to_index_engine_ = nullptr; + engine_ = nullptr; } } // namespace scheduler diff --git a/core/src/scheduler/task/SSBuildIndexTask.h b/core/src/scheduler/task/SSBuildIndexTask.h index fbbd9226c2..adba93e69d 100644 --- a/core/src/scheduler/task/SSBuildIndexTask.h +++ b/core/src/scheduler/task/SSBuildIndexTask.h @@ -11,7 +11,10 @@ #pragma once +#include + #include "db/SnapshotVisitor.h" +#include "db/engine/SSExecutionEngine.h" #include "scheduler/Definition.h" #include "scheduler/job/SSBuildIndexJob.h" #include "scheduler/task/Task.h" @@ -21,7 +24,8 @@ namespace scheduler { class XSSBuildIndexTask : public Task { public: - explicit XSSBuildIndexTask(const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label); + explicit XSSBuildIndexTask(const std::string& dir_root, const engine::SegmentVisitorPtr& visitor, + TaskLabelPtr label); void Load(LoadType type, uint8_t device_id) override; @@ -31,11 +35,7 @@ class XSSBuildIndexTask : public Task { public: engine::SegmentVisitorPtr visitor_; - // SegmentSchemaPtr file_; - // SegmentSchema table_file_; - // size_t to_index_id_ = 0; - int to_index_type_ = 0; - ExecutionEnginePtr to_index_engine_ = nullptr; + engine::SSExecutionEnginePtr engine_ = nullptr; }; } // namespace scheduler diff --git a/core/src/scheduler/task/SSSearchTask.cpp b/core/src/scheduler/task/SSSearchTask.cpp index 3675c7b866..08f4f6de54 100644 --- a/core/src/scheduler/task/SSSearchTask.cpp +++ b/core/src/scheduler/task/SSSearchTask.cpp @@ -9,7 +9,7 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -#include "scheduler/task/SearchTask.h" +#include "scheduler/task/SSSearchTask.h" #include @@ -21,74 +21,26 @@ #include #include "db/Utils.h" -#include "db/engine/EngineFactory.h" -#include "metrics/Metrics.h" +#include "db/engine/SSExecutionEngineImpl.h" #include "scheduler/SchedInst.h" #include "scheduler/job/SSSearchJob.h" -#include "scheduler/task/SSSearchTask.h" #include "segment/SegmentReader.h" -#include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" namespace milvus { namespace scheduler { -// void -// CollectFileMetrics(int file_type, size_t file_size) { -// server::MetricsBase& inst = server::Metrics::GetInstance(); -// switch (file_type) { -// case SegmentSchema::RAW: -// case SegmentSchema::TO_INDEX: { -// inst.RawFileSizeHistogramObserve(file_size); -// inst.RawFileSizeTotalIncrement(file_size); -// inst.RawFileSizeGaugeSet(file_size); -// break; -// } -// default: { -// inst.IndexFileSizeHistogramObserve(file_size); -// inst.IndexFileSizeTotalIncrement(file_size); -// inst.IndexFileSizeGaugeSet(file_size); -// break; -// } -// } -//} - -XSSSearchTask::XSSSearchTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor, - TaskLabelPtr label) +XSSSearchTask::XSSSearchTask(const server::ContextPtr& context, const std::string& dir_root, + const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label) : Task(TaskType::SearchTask, std::move(label)), context_(context), visitor_(visitor) { - // if (file_) { - // // distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ... - // // similarity -- infinity value means two vectors equal, descending reduce, IP - // if (file_->metric_type_ == static_cast(MetricType::IP) && - // file_->engine_type_ != static_cast(EngineType::FAISS_PQ)) { - // ascending_reduce = false; - // } - // - // EngineType engine_type; - // if (file->file_type_ == SegmentSchema::FILE_TYPE::RAW || - // file->file_type_ == SegmentSchema::FILE_TYPE::TO_INDEX || - // file->file_type_ == SegmentSchema::FILE_TYPE::BACKUP) { - // engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? EngineType::FAISS_BIN_IDMAP - // : EngineType::FAISS_IDMAP; - // } else { - // engine_type = (EngineType)file->engine_type_; - // } - // - // milvus::json json_params; - // if (!file_->index_params_.empty()) { - // json_params = milvus::json::parse(file_->index_params_); - // } - // index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, engine_type, - // (MetricType)file_->metric_type_, json_params); - // } + engine_ = std::make_shared(dir_root, visitor); } void XSSSearchTask::Load(LoadType type, uint8_t device_id) { - // milvus::server::ContextFollower tracer(context_, "XSearchTask::Load " + std::to_string(file_->id_)); - - TimeRecorder rc(LogOut("[%s][%ld]", "search", 0)); + auto seg_id = visitor_->GetSegment()->GetID(); + TimeRecorder rc(LogOut("[%s][%ld]", "search", seg_id)); Status stat = Status::OK(); std::string error_msg; std::string type_str; @@ -96,18 +48,14 @@ XSSSearchTask::Load(LoadType type, uint8_t device_id) { try { fiu_do_on("XSearchTask.Load.throw_std_exception", throw std::exception()); if (type == LoadType::DISK2CPU) { - // stat = index_engine_->Load(); - // stat = index_engine_->LoadAttr(); + stat = engine_->Load(nullptr); + // stat = engine_->LoadAttr(); type_str = "DISK2CPU"; } else if (type == LoadType::CPU2GPU) { - // bool hybrid = false; - // if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) { - // hybrid = true; - // } - // stat = index_engine_->CopyToGpu(device_id, hybrid); + stat = engine_->CopyToGpu(device_id); type_str = "CPU2GPU" + std::to_string(device_id); } else if (type == LoadType::GPU2CPU) { - // stat = index_engine_->CopyToCpu(); + // stat = engine_->CopyToCpu(); type_str = "GPU2CPU"; } else { error_msg = "Wrong load type"; @@ -116,7 +64,7 @@ XSSSearchTask::Load(LoadType type, uint8_t device_id) { } catch (std::exception& ex) { // typical error: out of disk space or permition denied error_msg = "Failed to load index file: " + std::string(ex.what()); - LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter execption: %s", "search", 0, error_msg.c_str()); + LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter exception: %s", "search", 0, error_msg.c_str()); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } fiu_do_on("XSearchTask.Load.out_of_memory", stat = Status(SERVER_UNEXPECTED_ERROR, "out of memory")); @@ -133,26 +81,15 @@ XSSSearchTask::Load(LoadType type, uint8_t device_id) { if (auto job = job_.lock()) { auto search_job = std::static_pointer_cast(job); - search_job->SearchDone(visitor_->GetSegment()->GetID()); - search_job->GetStatus() = s; + search_job->SearchDone(seg_id); + search_job->status() = s; } return; } - // size_t file_size = index_engine_->Size(); - - // std::string info = "Search task load file id:" + std::to_string(file_->id_) + " " + type_str + - // " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + - // " bytes from location: " + file_->location_ + " totally cost"; - // rc.ElapseFromBegin(info); - // - // CollectFileMetrics(file_->file_type_, file_size); - // - // // step 2: return search task for later execution - // index_id_ = file_->id_; - // index_type_ = file_->file_type_; - // search_contexts_.swap(search_contexts_); + std::string info = "Search task load segment id: " + std::to_string(seg_id) + " " + type_str + " totally cost"; + rc.ElapseFromBegin(info); } void @@ -161,84 +98,48 @@ XSSSearchTask::Execute() { milvus::server::ContextFollower tracer(context_, "XSearchTask::Execute " + std::to_string(seg_id)); TimeRecorder rc(LogOut("[%s][%ld] DoSearch file id:%ld", "search", 0, seg_id)); - server::CollectDurationMetrics metrics(index_type_); - - std::vector output_ids; - std::vector output_distance; + engine::QueryResult result; double span; if (auto job = job_.lock()) { auto search_job = std::static_pointer_cast(job); - if (index_engine_ == nullptr) { + if (engine_ == nullptr) { search_job->SearchDone(seg_id); return; } - /* step 1: allocate memory */ - query::GeneralQueryPtr general_query = search_job->general_query(); - - uint64_t nq = search_job->nq(); - uint64_t topk = search_job->topk(); - fiu_do_on("XSearchTask.Execute.throw_std_exception", throw std::exception()); - // try { - // /* step 2: search */ - // bool hybrid = false; - // if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H && - // ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) { - // hybrid = true; - // } - // Status s; - // if (general_query != nullptr) { - // std::unordered_map types; - // auto attr_type = search_job->attr_type(); - // auto type_it = attr_type.begin(); - // for (; type_it != attr_type.end(); type_it++) { - // types.insert(std::make_pair(type_it->first, (DataType)(type_it->second))); - // } - // - // auto query_ptr = search_job->query_ptr(); - // - // s = index_engine_->HybridSearch(search_job, types, output_distance, output_ids, hybrid); - // auto vector_query = query_ptr->vectors.begin()->second; - // topk = vector_query->topk; - // nq = vector_query->query_vector.float_data.size() / file_->dimension_; - // search_job->vector_count() = nq; - // } else { - // s = index_engine_->Search(output_ids, output_distance, search_job, hybrid); - // } - // - // fiu_do_on("XSearchTask.Execute.search_fail", s = Status(SERVER_UNEXPECTED_ERROR, "")); - // if (!s.ok()) { - // search_job->GetStatus() = s; - // search_job->SearchDone(index_id_); - // return; - // } - // - // span = rc.RecordSection("search done"); - // - // /* step 3: pick up topk result */ - // auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk; - // if (spec_k == 0) { - // LOG_ENGINE_WARNING_ << LogOut("[%s][%ld] Searching in an empty file. file location = %s", - // "search", 0, - // file_->location_.c_str()); - // } else { - // std::unique_lock lock(search_job->mutex()); - // XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, - // ascending_reduce, - // search_job->GetResultIds(), - // search_job->GetResultDistances()); - // } - // - // span = rc.RecordSection("reduce topk done"); - // search_job->time_stat().reduce_time += span / 1000; - // } catch (std::exception& ex) { - // LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] SearchTask encounter exception: %s", "search", 0, - // ex.what()); search_job->GetStatus() = Status(SERVER_UNEXPECTED_ERROR, ex.what()); - // } + try { + /* step 2: search */ + Status s = engine_->Search(search_job->query_ptr(), result); + + fiu_do_on("XSearchTask.Execute.search_fail", s = Status(SERVER_UNEXPECTED_ERROR, "")); + if (!s.ok()) { + search_job->SearchDone(seg_id); + search_job->status() = s; + return; + } + + span = rc.RecordSection("search done"); + + /* step 3: pick up topk result */ + // auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk; + // if (spec_k == 0) { + // LOG_ENGINE_WARNING_ << LogOut("[%s][%ld] Searching in an empty file. file location = %s", + // "search", 0, + // file_->location_.c_str()); + // } else { + // std::unique_lock lock(search_job->mutex()); + // XSearchTask::MergeTopkToResultSet(result, spec_k, nq, topk, ascending_, search_job->GetQueryResult()); + // } + + span = rc.RecordSection("reduce topk done"); + } catch (std::exception& ex) { + LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] SearchTask encounter exception: %s", "search", 0, ex.what()); + search_job->status() = Status(SERVER_UNEXPECTED_ERROR, ex.what()); + } /* step 4: notify to send result to client */ search_job->SearchDone(seg_id); @@ -246,15 +147,18 @@ XSSSearchTask::Execute() { rc.ElapseFromBegin("totally cost"); - // release index in resource - index_engine_ = nullptr; + // release engine resource + engine_ = nullptr; } void -XSSSearchTask::MergeTopkToResultSet(const scheduler::ResultIds& src_ids, - const scheduler::ResultDistances& src_distances, size_t src_k, size_t nq, - size_t topk, bool ascending, scheduler::ResultIds& tar_ids, - scheduler::ResultDistances& tar_distances) { +XSSSearchTask::MergeTopkToResultSet(const engine::QueryResult& src_result, size_t src_k, size_t nq, size_t topk, + bool ascending, engine::QueryResult& tar_result) { + const engine::ResultIds& src_ids = src_result.result_ids_; + const engine::ResultDistances& src_distances = src_result.result_distances_; + engine::ResultIds& tar_ids = tar_result.result_ids_; + engine::ResultDistances& tar_distances = tar_result.result_distances_; + if (src_ids.empty()) { LOG_ENGINE_DEBUG_ << LogOut("[%s][%d] Search result is empty.", "search", 0); return; @@ -318,15 +222,5 @@ XSSSearchTask::MergeTopkToResultSet(const scheduler::ResultIds& src_ids, tar_distances.swap(buf_distances); } -// const std::string& -// XSSSearchTask::GetLocation() const { -// return file_->location_; -//} - -// size_t -// XSSSearchTask::GetIndexId() const { -// return file_->id_; -//} - } // namespace scheduler } // namespace milvus diff --git a/core/src/scheduler/task/SSSearchTask.h b/core/src/scheduler/task/SSSearchTask.h index 799e5569c1..d134910d1f 100644 --- a/core/src/scheduler/task/SSSearchTask.h +++ b/core/src/scheduler/task/SSSearchTask.h @@ -16,6 +16,7 @@ #include #include "db/SnapshotVisitor.h" +#include "db/engine/SSExecutionEngine.h" #include "scheduler/Definition.h" #include "scheduler/job/SSSearchJob.h" #include "scheduler/task/Task.h" @@ -26,8 +27,8 @@ namespace scheduler { // TODO(wxyu): rewrite class XSSSearchTask : public Task { public: - explicit XSSSearchTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor, - TaskLabelPtr label); + explicit XSSSearchTask(const server::ContextPtr& context, const std::string& dir_root, + const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label); void Load(LoadType type, uint8_t device_id) override; @@ -37,28 +38,19 @@ class XSSSearchTask : public Task { public: static void - MergeTopkToResultSet(const scheduler::ResultIds& src_ids, const scheduler::ResultDistances& src_distances, - size_t src_k, size_t nq, size_t topk, bool ascending, scheduler::ResultIds& tar_ids, - scheduler::ResultDistances& tar_distances); - - // const std::string& - // GetLocation() const; - - // size_t - // GetIndexId() const; + MergeTopkToResultSet(const engine::QueryResult& src_result, size_t src_k, size_t nq, size_t topk, bool ascending, + engine::QueryResult& tar_result); public: const server::ContextPtr context_; engine::SegmentVisitorPtr visitor_; - // size_t index_id_ = 0; - int index_type_ = 0; - ExecutionEnginePtr index_engine_ = nullptr; + engine::SSExecutionEnginePtr engine_ = nullptr; // distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ... // similarity -- infinity value means two vectors equal, descending reduce, IP - bool ascending_reduce = true; + bool ascending_ = true; }; } // namespace scheduler diff --git a/core/src/scheduler/task/SSTestTask.cpp b/core/src/scheduler/task/SSTestTask.cpp index c1bac10aa2..33a72b40d5 100644 --- a/core/src/scheduler/task/SSTestTask.cpp +++ b/core/src/scheduler/task/SSTestTask.cpp @@ -18,7 +18,7 @@ namespace milvus { namespace scheduler { SSTestTask::SSTestTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label) - : XSSSearchTask(context, visitor, std::move(label)) { + : XSSSearchTask(context, "", visitor, std::move(label)) { } void diff --git a/core/unittest/ssdb/test_db.cpp b/core/unittest/ssdb/test_db.cpp index 430954c732..2b4206c7c0 100644 --- a/core/unittest/ssdb/test_db.cpp +++ b/core/unittest/ssdb/test_db.cpp @@ -400,58 +400,6 @@ TEST_F(SSDBTest, VisitorTest) { std::cout << ss->ToString() << std::endl; } -TEST_F(SSDBTest, QueryTest) { - LSN_TYPE lsn = 0; - auto next_lsn = [&]() -> decltype(lsn) { - return ++lsn; - }; - - std::string c1 = "c1"; - auto status = CreateCollection(db_, c1, next_lsn()); - ASSERT_TRUE(status.ok()); - - std::stringstream p_name; - auto num = RandomInt(1, 3); - for (auto i = 0; i < num; ++i) { - p_name.str(""); - p_name << "partition_" << i; - status = db_->CreatePartition(c1, p_name.str()); - ASSERT_TRUE(status.ok()); - } - - ScopedSnapshotT ss; - status = Snapshots::GetInstance().GetSnapshot(ss, c1); - ASSERT_TRUE(status.ok()); - - SegmentFileContext sf_context; - SFContextBuilder(sf_context, ss); - - auto new_total = 0; - auto &partitions = ss->GetResources(); - ID_TYPE partition_id; - for (auto &kv : partitions) { - num = RandomInt(1, 3); - auto row_cnt = 100; - for (auto i = 0; i < num; ++i) { - ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context, row_cnt).ok()); - } - new_total += num; - partition_id = kv.first; - } - - status = Snapshots::GetInstance().GetSnapshot(ss, c1); - ASSERT_TRUE(status.ok()); - - milvus::server::ContextPtr ctx1; - std::vector partition_patterns; - milvus::query::GeneralQueryPtr general_query; - milvus::query::QueryPtr query_ptr; - std::vector field_names; - std::unordered_map attr_type; - milvus::engine::QueryResult result; - //db_->Query(ctx1, c1, partition_patterns, general_query, query_ptr, field_names, attr_type, result); -} - TEST_F(SSDBTest, InsertTest) { std::string collection_name = "MERGE_TEST"; auto status = CreateCollection2(db_, collection_name, 0); diff --git a/core/unittest/ssdb/test_ss_job.cpp b/core/unittest/ssdb/test_ss_job.cpp index 788e8eaa77..b211eed953 100644 --- a/core/unittest/ssdb/test_ss_job.cpp +++ b/core/unittest/ssdb/test_ss_job.cpp @@ -11,31 +11,100 @@ #include +#include "db/SnapshotVisitor.h" +#include "knowhere/index/vector_index/helpers/IndexParameter.h" +#include "scheduler/SchedInst.h" #include "scheduler/job/SSBuildIndexJob.h" #include "scheduler/job/SSSearchJob.h" +#include "ssdb/utils.h" -namespace milvus { -namespace scheduler { +using SegmentVisitor = milvus::engine::SegmentVisitor; -class TestJob : public Job { - public: - TestJob() : Job(JobType::INVALID) {} -}; +namespace { +milvus::Status +CreateCollection(std::shared_ptr db, const std::string& collection_name, const LSN_TYPE& lsn) { + CreateCollectionContext context; + context.lsn = lsn; + auto collection_schema = std::make_shared(collection_name); + context.collection = collection_schema; + auto vector_field = std::make_shared("vector", 0, + milvus::engine::FieldType::VECTOR); + auto vector_field_element = std::make_shared(0, 0, "ivfsq8", + milvus::engine::FieldElementType::FET_INDEX); + auto int_field = std::make_shared("int", 0, + milvus::engine::FieldType::INT32); + context.fields_schema[vector_field] = {vector_field_element}; + context.fields_schema[int_field] = {}; -TEST(SSJobTest, TestJob) { - engine::DBOptions options; - auto build_index_ptr = std::make_shared(options); - build_index_ptr->Dump(); - build_index_ptr->AddSegmentVisitor(nullptr); - - TestJob test_job; - test_job.Dump(); - - engine::VectorsData vectors; - auto search_ptr = std::make_shared(nullptr, 1, 1, vectors); - search_ptr->Dump(); - search_ptr->AddSegmentVisitor(nullptr); + return db->CreateCollection(context); } +} // namespace -} // namespace scheduler -} // namespace milvus +TEST_F(SSSchedulerTest, SSJobTest) { + LSN_TYPE lsn = 0; + auto next_lsn = [&]() -> decltype(lsn) { + return ++lsn; + }; + + std::string c1 = "c1"; + auto status = CreateCollection(db_, c1, next_lsn()); + ASSERT_TRUE(status.ok()); + + status = db_->CreatePartition(c1, "p_0"); + ASSERT_TRUE(status.ok()); + + ScopedSnapshotT ss; + status = Snapshots::GetInstance().GetSnapshot(ss, c1); + ASSERT_TRUE(status.ok()); + + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + + auto& partitions = ss->GetResources(); + ASSERT_EQ(partitions.size(), 2); + for (auto& kv : partitions) { + int64_t row_cnt = 100; + ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context, row_cnt).ok()); + } + + status = Snapshots::GetInstance().GetSnapshot(ss, c1); + ASSERT_TRUE(status.ok()); + + /* collect all valid segment */ + std::vector segment_visitors; + auto executor = [&](const SegmentPtr& segment, SegmentIterator* handler) -> Status { + auto visitor = SegmentVisitor::Build(ss, segment->GetID()); + if (visitor == nullptr) { + return Status(milvus::SS_ERROR, "Cannot build segment visitor"); + } + segment_visitors.push_back(visitor); + return Status::OK(); + }; + + auto segment_iter = std::make_shared(ss, executor); + segment_iter->Iterate(); + ASSERT_TRUE(segment_iter->GetStatus().ok()); + ASSERT_EQ(segment_visitors.size(), 2); + + /* create BuildIndexJob */ + milvus::scheduler::SSBuildIndexJobPtr build_index_job = + std::make_shared(""); + for (auto& sv : segment_visitors) { + build_index_job->AddSegmentVisitor(sv); + } + + /* put search job to scheduler and wait result */ + milvus::scheduler::JobMgrInst::GetInstance()->Put(build_index_job); + build_index_job->WaitFinish(); + + /* create SearchJob */ + milvus::scheduler::SSSearchJobPtr search_job = + std::make_shared(nullptr, "", nullptr); + for (auto& sv : segment_visitors) { + search_job->AddSegmentVisitor(sv); + } + + /* put search job to scheduler and wait result */ + milvus::scheduler::JobMgrInst::GetInstance()->Put(search_job); + search_job->WaitFinish(); +} diff --git a/core/unittest/ssdb/test_ss_task.cpp b/core/unittest/ssdb/test_ss_task.cpp index ce25c1f724..bead3650de 100644 --- a/core/unittest/ssdb/test_ss_task.cpp +++ b/core/unittest/ssdb/test_ss_task.cpp @@ -37,11 +37,11 @@ TEST(SSTaskTest, INVALID_INDEX) { auto trace_context = std::make_shared(mock_span); dummy_context->SetTraceContext(trace_context); - auto search_task = std::make_shared(dummy_context, nullptr, nullptr); - search_task->Load(LoadType::TEST, 10); - - auto build_task = std::make_shared(nullptr, nullptr); - build_task->Load(LoadType::TEST, 10); +// auto search_task = std::make_shared(dummy_context, nullptr, nullptr); +// search_task->Load(LoadType::TEST, 10); +// +// auto build_task = std::make_shared(nullptr, nullptr); +// build_task->Load(LoadType::TEST, 10); // build_task->Execute(); } @@ -54,16 +54,16 @@ TEST(SSTaskTest, TEST_TASK) { // file->dimension_ = 64; auto label = std::make_shared(); - SSTestTask task(dummy_context, nullptr, label); - task.Load(LoadType::CPU2GPU, 0); - auto th = std::thread([&]() { - task.Execute(); - }); - task.Wait(); - - if (th.joinable()) { - th.join(); - } +// SSTestTask task(dummy_context, nullptr, label); +// task.Load(LoadType::CPU2GPU, 0); +// auto th = std::thread([&]() { +// task.Execute(); +// }); +// task.Wait(); +// +// if (th.joinable()) { +// th.join(); +// } // static const char* CONFIG_PATH = "/tmp/milvus_test"; // auto options = milvus::engine::DBFactory::BuildOption(); diff --git a/core/unittest/ssdb/utils.cpp b/core/unittest/ssdb/utils.cpp index 0ff69e90a0..b39f765458 100644 --- a/core/unittest/ssdb/utils.cpp +++ b/core/unittest/ssdb/utils.cpp @@ -215,33 +215,10 @@ SSDBTest::SetUp() { BaseTest::SetUp(); BaseTest::SnapshotStart(false); db_ = std::make_shared(GetOptions()); - - auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance(); - res_mgr->Clear(); - res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, false)); - res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0)); - - auto default_conn = milvus::scheduler::Connection("IO", 500.0); - auto PCIE = milvus::scheduler::Connection("IO", 11000.0); - res_mgr->Connect("disk", "cpu", default_conn); -#ifdef MILVUS_GPU_VERSION - res_mgr->Add(milvus::scheduler::ResourceFactory::Create("0", "GPU", 0)); - res_mgr->Connect("cpu", "0", PCIE); -#endif - res_mgr->Start(); - milvus::scheduler::SchedInst::GetInstance()->Start(); - milvus::scheduler::JobMgrInst::GetInstance()->Start(); - milvus::scheduler::CPUBuilderInst::GetInstance()->Start(); } void SSDBTest::TearDown() { - milvus::scheduler::JobMgrInst::GetInstance()->Stop(); - milvus::scheduler::SchedInst::GetInstance()->Stop(); - milvus::scheduler::CPUBuilderInst::GetInstance()->Stop(); - milvus::scheduler::ResMgrInst::GetInstance()->Stop(); - milvus::scheduler::ResMgrInst::GetInstance()->Clear(); - BaseTest::SnapshotStop(); db_ = nullptr; auto options = GetOptions(); @@ -280,6 +257,46 @@ void SSMetaTest::TearDown() { } +///////////////////////////////////////////////////////////////////////////////////////////////////////////////// +void +SSSchedulerTest::SetUp() { + BaseTest::SetUp(); + BaseTest::SnapshotStart(true); + auto options = milvus::engine::DBOptions(); + options.wal_enable_ = false; + db_ = std::make_shared(options); + + auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance(); + res_mgr->Clear(); + res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, false)); + res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0)); + + auto default_conn = milvus::scheduler::Connection("IO", 500.0); + auto PCIE = milvus::scheduler::Connection("IO", 11000.0); + res_mgr->Connect("disk", "cpu", default_conn); +#ifdef MILVUS_GPU_VERSION + res_mgr->Add(milvus::scheduler::ResourceFactory::Create("0", "GPU", 0)); + res_mgr->Connect("cpu", "0", PCIE); +#endif + res_mgr->Start(); + milvus::scheduler::SchedInst::GetInstance()->Start(); + milvus::scheduler::JobMgrInst::GetInstance()->Start(); + milvus::scheduler::CPUBuilderInst::GetInstance()->Start(); +} + +void +SSSchedulerTest::TearDown() { + milvus::scheduler::JobMgrInst::GetInstance()->Stop(); + milvus::scheduler::SchedInst::GetInstance()->Stop(); + milvus::scheduler::CPUBuilderInst::GetInstance()->Stop(); + milvus::scheduler::ResMgrInst::GetInstance()->Stop(); + milvus::scheduler::ResMgrInst::GetInstance()->Clear(); + + db_ = nullptr; + BaseTest::SnapshotStop(); + BaseTest::TearDown(); +} + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// int main(int argc, char **argv) { diff --git a/core/unittest/ssdb/utils.h b/core/unittest/ssdb/utils.h index 981b697da0..236dbb5605 100644 --- a/core/unittest/ssdb/utils.h +++ b/core/unittest/ssdb/utils.h @@ -342,3 +342,14 @@ class SSMetaTest : public BaseTest { void TearDown() override; }; + +/////////////////////////////////////////////////////////////////////////////// +class SSSchedulerTest : public BaseTest { + protected: + std::shared_ptr db_; + + void + SetUp() override; + void + TearDown() override; +};