snapshot scheduler (#2961)

* update interface

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add test_ss_job

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update Query

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add dir_root for SSSearchJob and SSBuildIndexJob

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/2939/head
Cai Yudong 2020-07-22 14:11:28 +08:00 committed by GitHub
parent 21ea715b78
commit b3608e1ae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 359 additions and 707 deletions

View File

@ -496,8 +496,7 @@ SSDBImpl::Flush() {
}
Status
SSDBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
double threshold) {
SSDBImpl::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
@ -589,7 +588,7 @@ SSDBImpl::GetEntityIDs(const std::string& collection_id, int64_t segment_id, IDN
}
Status
SSDBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
SSDBImpl::CreateIndex(const server::ContextPtr& context, const std::string& collection_id,
const std::string& field_name, const CollectionIndex& index) {
return Status::OK();
}
@ -627,95 +626,50 @@ SSDBImpl::DropIndex(const std::string& collection_id) {
}
Status
SSDBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResult& result) {
SSDBImpl::Query(const server::ContextPtr& context, const std::string& collection_name, const query::QueryPtr& query_ptr,
engine::QueryResultPtr& result) {
CHECK_INITIALIZED;
milvus::server::ContextChild tracer(context, "Query");
TimeRecorder rc("SSDBImpl::Query");
// snapshot::ScopedSnapshotT ss;
// STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
//
// /* collect all valid segment */
// std::vector<SegmentVisitor::Ptr> segment_visitors;
// auto exec = [&] (const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
// auto p_id = segment->GetPartitionId();
// auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
// auto& p_name = p_ptr->GetName();
//
// /* check partition match pattern */
// bool match = false;
// if (partition_patterns.empty()) {
// match = true;
// } else {
// for (auto &pattern : partition_patterns) {
// if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
// match = true;
// break;
// }
// }
// }
//
// if (match) {
// auto visitor = SegmentVisitor::Build(ss, segment->GetID());
// if (!visitor) {
// return Status(milvus::SS_ERROR, "Cannot build segment visitor");
// }
// segment_visitors.push_back(visitor);
// }
// return Status::OK();
// };
//
// auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
// segment_iter->Iterate();
// STATUS_CHECK(segment_iter->GetStatus());
//
// LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());
//
// VectorsData vectors;
// scheduler::SSSearchJobPtr job =
// std::make_shared<scheduler::SSSearchJob>(tracer.Context(), general_query, query_ptr, attr_type, vectors);
// for (auto& sv : segment_visitors) {
// job->AddSegmentVisitor(sv);
// }
//
// // step 2: put search job to scheduler and wait result
// scheduler::JobMgrInst::GetInstance()->Put(job);
// job->WaitResult();
//
// if (!job->GetStatus().ok()) {
// return job->GetStatus();
// }
//
// // step 3: construct results
// result.row_num_ = job->vector_count();
// result.result_ids_ = job->GetResultIds();
// result.result_distances_ = job->GetResultDistances();
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
// step 4: get entities by result ids
// STATUS_CHECK(GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_));
/* collect all segment visitors */
std::vector<SegmentVisitor::Ptr> segment_visitors;
auto exec = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* handler) -> Status {
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
if (!visitor) {
return Status(milvus::SS_ERROR, "Cannot build segment visitor");
}
segment_visitors.push_back(visitor);
return Status::OK();
};
// step 5: filter entities by field names
// std::vector<engine::AttrsData> filter_attrs;
// for (auto attr : result.attrs_) {
// AttrsData attrs_data;
// attrs_data.attr_type_ = attr.attr_type_;
// attrs_data.attr_count_ = attr.attr_count_;
// attrs_data.id_array_ = attr.id_array_;
// for (auto& name : field_names) {
// if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
// attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
// }
// }
// filter_attrs.emplace_back(attrs_data);
// }
auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
segment_iter->Iterate();
STATUS_CHECK(segment_iter->GetStatus());
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());
scheduler::SSSearchJobPtr job = std::make_shared<scheduler::SSSearchJob>(nullptr, options_.meta_.path_, query_ptr);
for (auto& sv : segment_visitors) {
job->AddSegmentVisitor(sv);
}
/* put search job to scheduler and wait job finish */
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitFinish();
if (!job->status().ok()) {
return job->status();
}
result = job->query_result();
rc.ElapseFromBegin("Engine query totally cost");
// tracer.Context()->GetTraceContext()->GetSpan()->Finish();
return Status::OK();
return job->status();
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -94,8 +94,7 @@ class SSDBImpl {
Flush();
Status
Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
double threshold = 0.0);
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold = 0.0);
Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
@ -105,8 +104,8 @@ class SSDBImpl {
GetEntityIDs(const std::string& collection_id, int64_t segment_id, IDNumbers& entity_ids);
Status
CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
const std::string& field_name, const CollectionIndex& index);
CreateIndex(const server::ContextPtr& context, const std::string& collection_id, const std::string& field_name,
const CollectionIndex& index);
Status
DescribeIndex(const std::string& collection_id, const std::string& field_name, CollectionIndex& index);
@ -118,7 +117,8 @@ class SSDBImpl {
DropIndex(const std::string& collection_id);
Status
Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResult& result);
Query(const server::ContextPtr& context, const std::string& collection_name, const query::QueryPtr& query_ptr,
engine::QueryResultPtr& result);
private:
void

View File

@ -15,6 +15,7 @@
#include <cstdint>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
@ -73,6 +74,7 @@ struct QueryResult {
std::vector<engine::VectorsData> vectors_;
std::vector<engine::AttrsData> attrs_;
};
using QueryResultPtr = std::shared_ptr<QueryResult>;
using File2ErrArray = std::map<std::string, std::vector<std::string>>;
using Table2FileErr = std::map<std::string, File2ErrArray>;

View File

@ -85,7 +85,7 @@ std::vector<TaskPtr>
TaskCreator::Create(const SSSearchJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& sv : job->segment_visitor_map()) {
auto task = std::make_shared<XSSSearchTask>(job->GetContext(), sv.second, nullptr);
auto task = std::make_shared<XSSSearchTask>(job->GetContext(), job->dir_root(), sv.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
@ -96,7 +96,7 @@ std::vector<TaskPtr>
TaskCreator::Create(const SSBuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& sv : job->segment_visitor_map()) {
auto task = std::make_shared<XSSBuildIndexTask>(sv.second, nullptr);
auto task = std::make_shared<XSSBuildIndexTask>(job->dir_root(), sv.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}

View File

@ -18,7 +18,7 @@
namespace milvus {
namespace scheduler {
SSBuildIndexJob::SSBuildIndexJob(engine::DBOptions options) : Job(JobType::SS_BUILD), options_(std::move(options)) {
SSBuildIndexJob::SSBuildIndexJob(const std::string& dir_root) : Job(JobType::SS_BUILD), dir_root_(dir_root) {
SetIdentity("SSBuildIndexJob");
AddCacheInsertDataListener();
}
@ -31,7 +31,7 @@ SSBuildIndexJob::AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor) {
}
void
SSBuildIndexJob::WaitBuildIndexFinish() {
SSBuildIndexJob::WaitFinish() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return segment_visitor_map_.empty(); });
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] BuildIndexJob %ld all done", "build index", 0, id());
@ -55,10 +55,10 @@ SSBuildIndexJob::Dump() const {
return ret;
}
void
SSBuildIndexJob::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
// void
// SSBuildIndexJob::OnCacheInsertDataChanged(bool value) {
// options_.insert_cache_immediately_ = value;
//}
} // namespace scheduler
} // namespace milvus

View File

@ -30,26 +30,18 @@
namespace milvus {
namespace scheduler {
// using engine::meta::SegmentSchemaPtr;
// using Id2ToIndexMap = std::unordered_map<size_t, SegmentSchemaPtr>;
// using Id2ToTableFileMap = std::unordered_map<size_t, SegmentSchema>;
class SSBuildIndexJob : public Job, public server::CacheConfigHandler {
public:
explicit SSBuildIndexJob(engine::DBOptions options);
explicit SSBuildIndexJob(const std::string& dir_root);
~SSBuildIndexJob() = default;
public:
// bool
// AddToIndexFiles(const SegmentSchemaPtr& to_index_file);
void
AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor);
void
WaitBuildIndexFinish();
WaitFinish();
void
BuildIndexDone(const engine::snapshot::ID_TYPE seg_id);
@ -58,39 +50,33 @@ class SSBuildIndexJob : public Job, public server::CacheConfigHandler {
Dump() const override;
public:
Status&
GetStatus() {
return status_;
const std::string&
dir_root() const {
return dir_root_;
}
// Id2ToIndexMap&
// to_index_files() {
// return to_index_files_;
// }
// engine::meta::MetaPtr
// meta() const {
// return meta_ptr_;
// }
const SegmentVisitorMap&
segment_visitor_map() {
segment_visitor_map() const {
return segment_visitor_map_;
}
engine::DBOptions
options() const {
return options_;
Status&
status() {
return status_;
}
protected:
void
OnCacheInsertDataChanged(bool value) override;
// engine::DBOptions
// options() const {
// return options_;
// }
// protected:
// void
// OnCacheInsertDataChanged(bool value) override;
private:
// Id2ToIndexMap to_index_files_;
// engine::meta::MetaPtr meta_ptr_;
engine::DBOptions options_;
// engine::DBOptions options_;
std::string dir_root_;
SegmentVisitorMap segment_visitor_map_;
Status status_;

View File

@ -10,27 +10,14 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/job/SSSearchJob.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
SSSearchJob::SSSearchJob(const server::ContextPtr& context, int64_t topk, const milvus::json& extra_params,
engine::VectorsData& vectors)
: Job(JobType::SS_SEARCH), context_(context), topk_(topk), extra_params_(extra_params), vectors_(vectors) {
}
SSSearchJob::SSSearchJob(const server::ContextPtr& context, milvus::query::GeneralQueryPtr general_query,
query::QueryPtr query_ptr,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
engine::VectorsData& vectors)
: Job(JobType::SS_SEARCH),
context_(context),
general_query_(general_query),
query_ptr_(query_ptr),
attr_type_(attr_type),
vectors_(vectors) {
SSSearchJob::SSSearchJob(const server::ContextPtr& context, const std::string& dir_root,
const query::QueryPtr& query_ptr)
: Job(JobType::SS_SEARCH), context_(context), dir_root_(dir_root), query_ptr_(query_ptr) {
}
void
@ -41,13 +28,13 @@ SSSearchJob::AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor) {
}
void
SSSearchJob::WaitResult() {
SSSearchJob::WaitFinish() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return segment_visitor_map_.empty(); });
// LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld: query_time %f, map_uids_time %f, reduce_time %f",
// "search", 0,
// id(), this->time_stat().query_time, this->time_stat().map_uids_time,
// this->time_stat().reduce_time);
// LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld: query_time %f, map_uids_time %f, reduce_time %f",
// "search", 0,
// id(), this->time_stat().query_time, this->time_stat().map_uids_time,
// this->time_stat().reduce_time);
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld all done", "search", 0, id());
}
@ -61,37 +48,13 @@ SSSearchJob::SearchDone(const engine::snapshot::ID_TYPE seg_id) {
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld finish segment: %ld", "search", 0, id(), seg_id);
}
ResultIds&
SSSearchJob::GetResultIds() {
return result_ids_;
}
ResultDistances&
SSSearchJob::GetResultDistances() {
return result_distances_;
}
Status&
SSSearchJob::GetStatus() {
return status_;
}
json
SSSearchJob::Dump() const {
json ret{
{"topk", topk_},
{"nq", vectors_.vector_count_},
{"extra_params", extra_params_.dump()},
};
json ret{{"extra_params", extra_params_.dump()}};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}
const std::shared_ptr<server::Context>&
SSSearchJob::GetContext() const {
return context_;
}
} // namespace scheduler
} // namespace milvus

View File

@ -25,21 +25,13 @@
#include "Job.h"
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
#include "db/meta/MetaTypes.h"
#include "query/GeneralQuery.h"
//#include "db/meta/MetaTypes.h"
#include "server/context/Context.h"
namespace milvus {
namespace scheduler {
using engine::meta::SegmentSchemaPtr;
using Id2IndexMap = std::unordered_map<size_t, SegmentSchemaPtr>;
using ResultIds = engine::ResultIds;
using ResultDistances = engine::ResultDistances;
// struct SearchTimeStat {
// double query_time = 0.0;
// double map_uids_time = 0.0;
@ -48,52 +40,25 @@ using ResultDistances = engine::ResultDistances;
class SSSearchJob : public Job {
public:
SSSearchJob(const server::ContextPtr& context, int64_t topk, const milvus::json& extra_params,
engine::VectorsData& vectors);
SSSearchJob(const server::ContextPtr& context, query::GeneralQueryPtr general_query, query::QueryPtr query_ptr,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
engine::VectorsData& vectorsData);
SSSearchJob(const server::ContextPtr& context, const std::string& dir_root, const query::QueryPtr& query_ptr);
public:
void
AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor);
void
WaitResult();
WaitFinish();
void
SearchDone(const engine::snapshot::ID_TYPE seg_id);
ResultIds&
GetResultIds();
ResultDistances&
GetResultDistances();
void
SetVectors(engine::VectorsData& vectors) {
vectors_ = vectors;
}
Status&
GetStatus();
json
Dump() const override;
public:
const server::ContextPtr&
GetContext() const;
int64_t
topk() {
return topk_;
}
int64_t
nq() const {
return vectors_.vector_count_;
GetContext() const {
return context_;
}
const milvus::json&
@ -101,70 +66,58 @@ class SSSearchJob : public Job {
return extra_params_;
}
const engine::VectorsData&
vectors() const {
return vectors_;
const std::string&
dir_root() const {
return dir_root_;
}
const SegmentVisitorMap&
segment_visitor_map() {
segment_visitor_map() const {
return segment_visitor_map_;
}
const query::QueryPtr
query_ptr() const {
return query_ptr_;
}
engine::QueryResultPtr&
query_result() {
return query_result_;
}
Status&
status() {
return status_;
}
std::mutex&
mutex() {
return mutex_;
}
query::GeneralQueryPtr
general_query() {
return general_query_;
}
query::QueryPtr
query_ptr() {
return query_ptr_;
}
std::unordered_map<std::string, engine::meta::hybrid::DataType>&
attr_type() {
return attr_type_;
}
int64_t
vector_count() {
return vector_count_;
}
// SearchTimeStat&
// time_stat() {
// return time_stat_;
// }
// SearchTimeStat&
// time_stat() {
// return time_stat_;
// }
private:
const server::ContextPtr context_;
int64_t topk_ = 0;
milvus::json extra_params_;
// TODO: smart pointer
engine::VectorsData& vectors_;
std::string dir_root_;
SegmentVisitorMap segment_visitor_map_;
// TODO: column-base better ?
ResultIds result_ids_;
ResultDistances result_distances_;
Status status_;
query::GeneralQueryPtr general_query_;
query::QueryPtr query_ptr_;
std::unordered_map<std::string, engine::meta::hybrid::DataType> attr_type_;
int64_t vector_count_;
engine::QueryResultPtr query_result_;
Status status_;
std::mutex mutex_;
std::condition_variable cv_;
// SearchTimeStat time_stat_;
// SearchTimeStat time_stat_;
};
using SSSearchJobPtr = std::shared_ptr<SSSearchJob>;

View File

@ -9,62 +9,43 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/task/SSBuildIndexTask.h"
#include <fiu-local.h>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "db/engine/SSExecutionEngineImpl.h"
#include "scheduler/job/SSBuildIndexJob.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "scheduler/task/SSBuildIndexTask.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace scheduler {
XSSBuildIndexTask::XSSBuildIndexTask(const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label)
XSSBuildIndexTask::XSSBuildIndexTask(const std::string& dir_root, const engine::SegmentVisitorPtr& visitor,
TaskLabelPtr label)
: Task(TaskType::BuildIndexTask, std::move(label)), visitor_(visitor) {
// if (file_) {
// EngineType engine_type;
// if (file->file_type_ == SegmentSchema::FILE_TYPE::RAW ||
// file->file_type_ == SegmentSchema::FILE_TYPE::TO_INDEX ||
// file->file_type_ == SegmentSchema::FILE_TYPE::BACKUP) {
// engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? EngineType::FAISS_BIN_IDMAP
// : EngineType::FAISS_IDMAP;
// } else {
// engine_type = (EngineType)file->engine_type_;
// }
//
// auto json = milvus::json::parse(file_->index_params_);
// to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, engine_type,
// (MetricType)file_->metric_type_, json);
// }
engine_ = std::make_shared<engine::SSExecutionEngineImpl>(dir_root, visitor);
}
void
XSSBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
TimeRecorder rc("XSSBuildIndexTask::Load");
auto seg_id = visitor_->GetSegment()->GetID();
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::SSBuildIndexJob>(job);
auto options = build_index_job->options();
// auto options = build_index_job->options();
try {
if (type == LoadType::DISK2CPU) {
stat = to_index_engine_->Load(options.insert_cache_immediately_);
stat = engine_->Load(nullptr);
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = to_index_engine_->CopyToIndexFileToGpu(device_id);
stat = engine_->CopyToGpu(device_id);
type_str = "CPU2GPU:" + std::to_string(device_id);
} else {
error_msg = "Wrong load type";
@ -92,23 +73,16 @@ XSSBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::SSBuildIndexJob>(job);
build_index_job->GetStatus() = s;
build_index_job->BuildIndexDone(visitor_->GetSegment()->GetID());
build_index_job->status() = s;
build_index_job->BuildIndexDone(seg_id);
}
return;
}
// size_t file_size = to_index_engine_->Size();
//
// std::string info = "Build index task load file id:" + std::to_string(file_->id_) + " " + type_str +
// " file type:" + std::to_string(file_->file_type_) + " size:" +
// std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally
// cost";
// rc.ElapseFromBegin(info);
//
// to_index_id_ = file_->id_;
// to_index_type_ = file_->file_type_;
std::string info =
"Build index task load segment id:" + std::to_string(seg_id) + " " + type_str + " totally cost";
rc.ElapseFromBegin(info);
}
}
@ -119,129 +93,18 @@ XSSBuildIndexTask::Execute() {
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::SSBuildIndexJob>(job);
if (to_index_engine_ == nullptr) {
if (engine_ == nullptr) {
build_index_job->BuildIndexDone(seg_id);
build_index_job->GetStatus() = Status(DB_ERROR, "source index is null");
build_index_job->status() = Status(DB_ERROR, "source index is null");
return;
}
// std::string location = file_->location_;
// std::shared_ptr<engine::ExecutionEngine> index;
//
// // step 1: create collection file
// engine::meta::SegmentSchema table_file;
// table_file.collection_id_ = file_->collection_id_;
// table_file.segment_id_ = file_->file_id_;
// table_file.date_ = file_->date_;
// table_file.file_type_ = engine::meta::SegmentSchema::NEW_INDEX;
//
// engine::meta::MetaPtr meta_ptr = build_index_job->meta();
// Status status = meta_ptr->CreateCollectionFile(table_file);
//
// fiu_do_on("XSSBuildIndexTask.Execute.create_table_success", status = Status::OK());
// if (!status.ok()) {
// LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.ToString();
// build_index_job->BuildIndexDone(to_index_id_);
// build_index_job->GetStatus() = status;
// to_index_engine_ = nullptr;
// return;
// }
//
// auto failed_build_index = [&](std::string log_msg, std::string err_msg) {
// table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE;
// status = meta_ptr->UpdateCollectionFile(table_file);
// LOG_ENGINE_ERROR_ << log_msg;
//
// build_index_job->BuildIndexDone(to_index_id_);
// build_index_job->GetStatus() = Status(DB_ERROR, err_msg);
// to_index_engine_ = nullptr;
// };
//
// // step 2: build index
// try {
// LOG_ENGINE_DEBUG_ << "Begin build index for file:" + table_file.location_;
// index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
// fiu_do_on("XSSBuildIndexTask.Execute.build_index_fail", index = nullptr);
// if (index == nullptr) {
// std::string log_msg = "Failed to build index " + table_file.file_id_ + ", reason: source index
// is null"; failed_build_index(log_msg, "source index is null"); return;
// }
// } catch (std::exception& ex) {
// std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: " +
// std::string(ex.what()); failed_build_index(msg, ex.what()); return;
// }
//
// // step 3: if collection has been deleted, dont save index file
// bool has_collection = false;
// meta_ptr->HasCollection(file_->collection_id_, has_collection);
// fiu_do_on("XSSBuildIndexTask.Execute.has_collection", has_collection = true);
//
// if (!has_collection) {
// std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: collection has been
// deleted"; failed_build_index(msg, "Collection has been deleted"); return;
// }
//
// // step 4: save index file
// try {
// fiu_do_on("XSSBuildIndexTask.Execute.throw_std_exception", throw std::exception());
// status = index->Serialize();
// if (!status.ok()) {
// std::string msg =
// "Failed to persist index file: " + table_file.location_ + ", reason: " + status.message();
// failed_build_index(msg, status.message());
// return;
// }
// } catch (std::exception& ex) {
// // if failed to serialize index file to disk
// // typical error: out of disk space, out of memory or permition denied
// std::string msg =
// "Failed to persist index file:" + table_file.location_ + ", exception:" +
// std::string(ex.what());
// failed_build_index(msg, ex.what());
// return;
// }
//
// // step 5: update meta
// table_file.file_type_ = engine::meta::SegmentSchema::INDEX;
// table_file.file_size_ = CommonUtil::GetFileSize(table_file.location_);
// table_file.row_count_ = file_->row_count_; // index->Count();
//
// auto origin_file = *file_;
// origin_file.file_type_ = engine::meta::SegmentSchema::BACKUP;
//
// engine::meta::SegmentsSchema update_files = {table_file, origin_file};
//
// if (status.ok()) { // makesure index file is sucessfully serialized to disk
// status = meta_ptr->UpdateCollectionFiles(update_files);
// }
//
// fiu_do_on("XSSBuildIndexTask.Execute.update_table_file_fail", status = Status(SERVER_UNEXPECTED_ERROR,
// "")); if (status.ok()) {
// LOG_ENGINE_DEBUG_ << "New index file " << table_file.file_id_ << " of size " <<
// table_file.file_size_
// << " bytes"
// << " from file " << origin_file.file_id_;
// // XXX_Index_NM doesn't support it now.
// // if (build_index_job->options().insert_cache_immediately_) {
// // index->Cache();
// // }
// } else {
// // failed to update meta, mark the new file as to_delete, don't delete old file
// origin_file.file_type_ = engine::meta::SegmentSchema::TO_INDEX;
// status = meta_ptr->UpdateCollectionFile(origin_file);
// LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << origin_file.file_id_
// << " to to_index";
//
// table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE;
// status = meta_ptr->UpdateCollectionFile(table_file);
// LOG_ENGINE_DEBUG_ << "Failed to up date file to index, mark file: " << table_file.file_id_
// << " to to_delete";
// }
//
// build_index_job->BuildIndexDone(to_index_id_);
// SS TODO
build_index_job->BuildIndexDone(seg_id);
}
to_index_engine_ = nullptr;
engine_ = nullptr;
}
} // namespace scheduler

View File

@ -11,7 +11,10 @@
#pragma once
#include <string>
#include "db/SnapshotVisitor.h"
#include "db/engine/SSExecutionEngine.h"
#include "scheduler/Definition.h"
#include "scheduler/job/SSBuildIndexJob.h"
#include "scheduler/task/Task.h"
@ -21,7 +24,8 @@ namespace scheduler {
class XSSBuildIndexTask : public Task {
public:
explicit XSSBuildIndexTask(const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label);
explicit XSSBuildIndexTask(const std::string& dir_root, const engine::SegmentVisitorPtr& visitor,
TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;
@ -31,11 +35,7 @@ class XSSBuildIndexTask : public Task {
public:
engine::SegmentVisitorPtr visitor_;
// SegmentSchemaPtr file_;
// SegmentSchema table_file_;
// size_t to_index_id_ = 0;
int to_index_type_ = 0;
ExecutionEnginePtr to_index_engine_ = nullptr;
engine::SSExecutionEnginePtr engine_ = nullptr;
};
} // namespace scheduler

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/task/SearchTask.h"
#include "scheduler/task/SSSearchTask.h"
#include <fiu-local.h>
@ -21,74 +21,26 @@
#include <utility>
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "db/engine/SSExecutionEngineImpl.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/SSSearchJob.h"
#include "scheduler/task/SSSearchTask.h"
#include "segment/SegmentReader.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace scheduler {
// void
// CollectFileMetrics(int file_type, size_t file_size) {
// server::MetricsBase& inst = server::Metrics::GetInstance();
// switch (file_type) {
// case SegmentSchema::RAW:
// case SegmentSchema::TO_INDEX: {
// inst.RawFileSizeHistogramObserve(file_size);
// inst.RawFileSizeTotalIncrement(file_size);
// inst.RawFileSizeGaugeSet(file_size);
// break;
// }
// default: {
// inst.IndexFileSizeHistogramObserve(file_size);
// inst.IndexFileSizeTotalIncrement(file_size);
// inst.IndexFileSizeGaugeSet(file_size);
// break;
// }
// }
//}
XSSSearchTask::XSSSearchTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor,
TaskLabelPtr label)
XSSSearchTask::XSSSearchTask(const server::ContextPtr& context, const std::string& dir_root,
const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label)
: Task(TaskType::SearchTask, std::move(label)), context_(context), visitor_(visitor) {
// if (file_) {
// // distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ...
// // similarity -- infinity value means two vectors equal, descending reduce, IP
// if (file_->metric_type_ == static_cast<int>(MetricType::IP) &&
// file_->engine_type_ != static_cast<int>(EngineType::FAISS_PQ)) {
// ascending_reduce = false;
// }
//
// EngineType engine_type;
// if (file->file_type_ == SegmentSchema::FILE_TYPE::RAW ||
// file->file_type_ == SegmentSchema::FILE_TYPE::TO_INDEX ||
// file->file_type_ == SegmentSchema::FILE_TYPE::BACKUP) {
// engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? EngineType::FAISS_BIN_IDMAP
// : EngineType::FAISS_IDMAP;
// } else {
// engine_type = (EngineType)file->engine_type_;
// }
//
// milvus::json json_params;
// if (!file_->index_params_.empty()) {
// json_params = milvus::json::parse(file_->index_params_);
// }
// index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, engine_type,
// (MetricType)file_->metric_type_, json_params);
// }
engine_ = std::make_shared<engine::SSExecutionEngineImpl>(dir_root, visitor);
}
void
XSSSearchTask::Load(LoadType type, uint8_t device_id) {
// milvus::server::ContextFollower tracer(context_, "XSearchTask::Load " + std::to_string(file_->id_));
TimeRecorder rc(LogOut("[%s][%ld]", "search", 0));
auto seg_id = visitor_->GetSegment()->GetID();
TimeRecorder rc(LogOut("[%s][%ld]", "search", seg_id));
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
@ -96,18 +48,14 @@ XSSSearchTask::Load(LoadType type, uint8_t device_id) {
try {
fiu_do_on("XSearchTask.Load.throw_std_exception", throw std::exception());
if (type == LoadType::DISK2CPU) {
// stat = index_engine_->Load();
// stat = index_engine_->LoadAttr();
stat = engine_->Load(nullptr);
// stat = engine_->LoadAttr();
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
// bool hybrid = false;
// if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) {
// hybrid = true;
// }
// stat = index_engine_->CopyToGpu(device_id, hybrid);
stat = engine_->CopyToGpu(device_id);
type_str = "CPU2GPU" + std::to_string(device_id);
} else if (type == LoadType::GPU2CPU) {
// stat = index_engine_->CopyToCpu();
// stat = engine_->CopyToCpu();
type_str = "GPU2CPU";
} else {
error_msg = "Wrong load type";
@ -116,7 +64,7 @@ XSSSearchTask::Load(LoadType type, uint8_t device_id) {
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load index file: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter execption: %s", "search", 0, error_msg.c_str());
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter exception: %s", "search", 0, error_msg.c_str());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
fiu_do_on("XSearchTask.Load.out_of_memory", stat = Status(SERVER_UNEXPECTED_ERROR, "out of memory"));
@ -133,26 +81,15 @@ XSSSearchTask::Load(LoadType type, uint8_t device_id) {
if (auto job = job_.lock()) {
auto search_job = std::static_pointer_cast<scheduler::SSSearchJob>(job);
search_job->SearchDone(visitor_->GetSegment()->GetID());
search_job->GetStatus() = s;
search_job->SearchDone(seg_id);
search_job->status() = s;
}
return;
}
// size_t file_size = index_engine_->Size();
// std::string info = "Search task load file id:" + std::to_string(file_->id_) + " " + type_str +
// " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
// " bytes from location: " + file_->location_ + " totally cost";
// rc.ElapseFromBegin(info);
//
// CollectFileMetrics(file_->file_type_, file_size);
//
// // step 2: return search task for later execution
// index_id_ = file_->id_;
// index_type_ = file_->file_type_;
// search_contexts_.swap(search_contexts_);
std::string info = "Search task load segment id: " + std::to_string(seg_id) + " " + type_str + " totally cost";
rc.ElapseFromBegin(info);
}
void
@ -161,84 +98,48 @@ XSSSearchTask::Execute() {
milvus::server::ContextFollower tracer(context_, "XSearchTask::Execute " + std::to_string(seg_id));
TimeRecorder rc(LogOut("[%s][%ld] DoSearch file id:%ld", "search", 0, seg_id));
server::CollectDurationMetrics metrics(index_type_);
std::vector<int64_t> output_ids;
std::vector<float> output_distance;
engine::QueryResult result;
double span;
if (auto job = job_.lock()) {
auto search_job = std::static_pointer_cast<scheduler::SSSearchJob>(job);
if (index_engine_ == nullptr) {
if (engine_ == nullptr) {
search_job->SearchDone(seg_id);
return;
}
/* step 1: allocate memory */
query::GeneralQueryPtr general_query = search_job->general_query();
uint64_t nq = search_job->nq();
uint64_t topk = search_job->topk();
fiu_do_on("XSearchTask.Execute.throw_std_exception", throw std::exception());
// try {
// /* step 2: search */
// bool hybrid = false;
// if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H &&
// ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) {
// hybrid = true;
// }
// Status s;
// if (general_query != nullptr) {
// std::unordered_map<std::string, DataType> types;
// auto attr_type = search_job->attr_type();
// auto type_it = attr_type.begin();
// for (; type_it != attr_type.end(); type_it++) {
// types.insert(std::make_pair(type_it->first, (DataType)(type_it->second)));
// }
//
// auto query_ptr = search_job->query_ptr();
//
// s = index_engine_->HybridSearch(search_job, types, output_distance, output_ids, hybrid);
// auto vector_query = query_ptr->vectors.begin()->second;
// topk = vector_query->topk;
// nq = vector_query->query_vector.float_data.size() / file_->dimension_;
// search_job->vector_count() = nq;
// } else {
// s = index_engine_->Search(output_ids, output_distance, search_job, hybrid);
// }
//
// fiu_do_on("XSearchTask.Execute.search_fail", s = Status(SERVER_UNEXPECTED_ERROR, ""));
// if (!s.ok()) {
// search_job->GetStatus() = s;
// search_job->SearchDone(index_id_);
// return;
// }
//
// span = rc.RecordSection("search done");
//
// /* step 3: pick up topk result */
// auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk;
// if (spec_k == 0) {
// LOG_ENGINE_WARNING_ << LogOut("[%s][%ld] Searching in an empty file. file location = %s",
// "search", 0,
// file_->location_.c_str());
// } else {
// std::unique_lock<std::mutex> lock(search_job->mutex());
// XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk,
// ascending_reduce,
// search_job->GetResultIds(),
// search_job->GetResultDistances());
// }
//
// span = rc.RecordSection("reduce topk done");
// search_job->time_stat().reduce_time += span / 1000;
// } catch (std::exception& ex) {
// LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] SearchTask encounter exception: %s", "search", 0,
// ex.what()); search_job->GetStatus() = Status(SERVER_UNEXPECTED_ERROR, ex.what());
// }
try {
/* step 2: search */
Status s = engine_->Search(search_job->query_ptr(), result);
fiu_do_on("XSearchTask.Execute.search_fail", s = Status(SERVER_UNEXPECTED_ERROR, ""));
if (!s.ok()) {
search_job->SearchDone(seg_id);
search_job->status() = s;
return;
}
span = rc.RecordSection("search done");
/* step 3: pick up topk result */
// auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk;
// if (spec_k == 0) {
// LOG_ENGINE_WARNING_ << LogOut("[%s][%ld] Searching in an empty file. file location = %s",
// "search", 0,
// file_->location_.c_str());
// } else {
// std::unique_lock<std::mutex> lock(search_job->mutex());
// XSearchTask::MergeTopkToResultSet(result, spec_k, nq, topk, ascending_, search_job->GetQueryResult());
// }
span = rc.RecordSection("reduce topk done");
} catch (std::exception& ex) {
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] SearchTask encounter exception: %s", "search", 0, ex.what());
search_job->status() = Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
/* step 4: notify to send result to client */
search_job->SearchDone(seg_id);
@ -246,15 +147,18 @@ XSSSearchTask::Execute() {
rc.ElapseFromBegin("totally cost");
// release index in resource
index_engine_ = nullptr;
// release engine resource
engine_ = nullptr;
}
void
XSSSearchTask::MergeTopkToResultSet(const scheduler::ResultIds& src_ids,
const scheduler::ResultDistances& src_distances, size_t src_k, size_t nq,
size_t topk, bool ascending, scheduler::ResultIds& tar_ids,
scheduler::ResultDistances& tar_distances) {
XSSSearchTask::MergeTopkToResultSet(const engine::QueryResult& src_result, size_t src_k, size_t nq, size_t topk,
bool ascending, engine::QueryResult& tar_result) {
const engine::ResultIds& src_ids = src_result.result_ids_;
const engine::ResultDistances& src_distances = src_result.result_distances_;
engine::ResultIds& tar_ids = tar_result.result_ids_;
engine::ResultDistances& tar_distances = tar_result.result_distances_;
if (src_ids.empty()) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%d] Search result is empty.", "search", 0);
return;
@ -318,15 +222,5 @@ XSSSearchTask::MergeTopkToResultSet(const scheduler::ResultIds& src_ids,
tar_distances.swap(buf_distances);
}
// const std::string&
// XSSSearchTask::GetLocation() const {
// return file_->location_;
//}
// size_t
// XSSSearchTask::GetIndexId() const {
// return file_->id_;
//}
} // namespace scheduler
} // namespace milvus

View File

@ -16,6 +16,7 @@
#include <vector>
#include "db/SnapshotVisitor.h"
#include "db/engine/SSExecutionEngine.h"
#include "scheduler/Definition.h"
#include "scheduler/job/SSSearchJob.h"
#include "scheduler/task/Task.h"
@ -26,8 +27,8 @@ namespace scheduler {
// TODO(wxyu): rewrite
class XSSSearchTask : public Task {
public:
explicit XSSSearchTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor,
TaskLabelPtr label);
explicit XSSSearchTask(const server::ContextPtr& context, const std::string& dir_root,
const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;
@ -37,28 +38,19 @@ class XSSSearchTask : public Task {
public:
static void
MergeTopkToResultSet(const scheduler::ResultIds& src_ids, const scheduler::ResultDistances& src_distances,
size_t src_k, size_t nq, size_t topk, bool ascending, scheduler::ResultIds& tar_ids,
scheduler::ResultDistances& tar_distances);
// const std::string&
// GetLocation() const;
// size_t
// GetIndexId() const;
MergeTopkToResultSet(const engine::QueryResult& src_result, size_t src_k, size_t nq, size_t topk, bool ascending,
engine::QueryResult& tar_result);
public:
const server::ContextPtr context_;
engine::SegmentVisitorPtr visitor_;
// size_t index_id_ = 0;
int index_type_ = 0;
ExecutionEnginePtr index_engine_ = nullptr;
engine::SSExecutionEnginePtr engine_ = nullptr;
// distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ...
// similarity -- infinity value means two vectors equal, descending reduce, IP
bool ascending_reduce = true;
bool ascending_ = true;
};
} // namespace scheduler

View File

@ -18,7 +18,7 @@ namespace milvus {
namespace scheduler {
SSTestTask::SSTestTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label)
: XSSSearchTask(context, visitor, std::move(label)) {
: XSSSearchTask(context, "", visitor, std::move(label)) {
}
void

View File

@ -400,58 +400,6 @@ TEST_F(SSDBTest, VisitorTest) {
std::cout << ss->ToString() << std::endl;
}
TEST_F(SSDBTest, QueryTest) {
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());
std::stringstream p_name;
auto num = RandomInt(1, 3);
for (auto i = 0; i < num; ++i) {
p_name.str("");
p_name << "partition_" << i;
status = db_->CreatePartition(c1, p_name.str());
ASSERT_TRUE(status.ok());
}
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
auto new_total = 0;
auto &partitions = ss->GetResources<Partition>();
ID_TYPE partition_id;
for (auto &kv : partitions) {
num = RandomInt(1, 3);
auto row_cnt = 100;
for (auto i = 0; i < num; ++i) {
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context, row_cnt).ok());
}
new_total += num;
partition_id = kv.first;
}
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
milvus::server::ContextPtr ctx1;
std::vector<std::string> partition_patterns;
milvus::query::GeneralQueryPtr general_query;
milvus::query::QueryPtr query_ptr;
std::vector<std::string> field_names;
std::unordered_map<std::string, milvus::engine::meta::hybrid::DataType> attr_type;
milvus::engine::QueryResult result;
//db_->Query(ctx1, c1, partition_patterns, general_query, query_ptr, field_names, attr_type, result);
}
TEST_F(SSDBTest, InsertTest) {
std::string collection_name = "MERGE_TEST";
auto status = CreateCollection2(db_, collection_name, 0);

View File

@ -11,31 +11,100 @@
#include <gtest/gtest.h>
#include "db/SnapshotVisitor.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/SSBuildIndexJob.h"
#include "scheduler/job/SSSearchJob.h"
#include "ssdb/utils.h"
namespace milvus {
namespace scheduler {
using SegmentVisitor = milvus::engine::SegmentVisitor;
class TestJob : public Job {
public:
TestJob() : Job(JobType::INVALID) {}
};
namespace {
milvus::Status
CreateCollection(std::shared_ptr<SSDBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<Field>("vector", 0,
milvus::engine::FieldType::VECTOR);
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
milvus::engine::FieldElementType::FET_INDEX);
auto int_field = std::make_shared<Field>("int", 0,
milvus::engine::FieldType::INT32);
context.fields_schema[vector_field] = {vector_field_element};
context.fields_schema[int_field] = {};
TEST(SSJobTest, TestJob) {
engine::DBOptions options;
auto build_index_ptr = std::make_shared<SSBuildIndexJob>(options);
build_index_ptr->Dump();
build_index_ptr->AddSegmentVisitor(nullptr);
TestJob test_job;
test_job.Dump();
engine::VectorsData vectors;
auto search_ptr = std::make_shared<SSSearchJob>(nullptr, 1, 1, vectors);
search_ptr->Dump();
search_ptr->AddSegmentVisitor(nullptr);
return db->CreateCollection(context);
}
} // namespace
} // namespace scheduler
} // namespace milvus
TEST_F(SSSchedulerTest, SSJobTest) {
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());
status = db_->CreatePartition(c1, "p_0");
ASSERT_TRUE(status.ok());
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
auto& partitions = ss->GetResources<Partition>();
ASSERT_EQ(partitions.size(), 2);
for (auto& kv : partitions) {
int64_t row_cnt = 100;
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context, row_cnt).ok());
}
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
/* collect all valid segment */
std::vector<milvus::engine::SegmentVisitorPtr> segment_visitors;
auto executor = [&](const SegmentPtr& segment, SegmentIterator* handler) -> Status {
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
if (visitor == nullptr) {
return Status(milvus::SS_ERROR, "Cannot build segment visitor");
}
segment_visitors.push_back(visitor);
return Status::OK();
};
auto segment_iter = std::make_shared<SegmentIterator>(ss, executor);
segment_iter->Iterate();
ASSERT_TRUE(segment_iter->GetStatus().ok());
ASSERT_EQ(segment_visitors.size(), 2);
/* create BuildIndexJob */
milvus::scheduler::SSBuildIndexJobPtr build_index_job =
std::make_shared<milvus::scheduler::SSBuildIndexJob>("");
for (auto& sv : segment_visitors) {
build_index_job->AddSegmentVisitor(sv);
}
/* put search job to scheduler and wait result */
milvus::scheduler::JobMgrInst::GetInstance()->Put(build_index_job);
build_index_job->WaitFinish();
/* create SearchJob */
milvus::scheduler::SSSearchJobPtr search_job =
std::make_shared<milvus::scheduler::SSSearchJob>(nullptr, "", nullptr);
for (auto& sv : segment_visitors) {
search_job->AddSegmentVisitor(sv);
}
/* put search job to scheduler and wait result */
milvus::scheduler::JobMgrInst::GetInstance()->Put(search_job);
search_job->WaitFinish();
}

View File

@ -37,11 +37,11 @@ TEST(SSTaskTest, INVALID_INDEX) {
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context->SetTraceContext(trace_context);
auto search_task = std::make_shared<XSSSearchTask>(dummy_context, nullptr, nullptr);
search_task->Load(LoadType::TEST, 10);
auto build_task = std::make_shared<XSSBuildIndexTask>(nullptr, nullptr);
build_task->Load(LoadType::TEST, 10);
// auto search_task = std::make_shared<XSSSearchTask>(dummy_context, nullptr, nullptr);
// search_task->Load(LoadType::TEST, 10);
//
// auto build_task = std::make_shared<XSSBuildIndexTask>(nullptr, nullptr);
// build_task->Load(LoadType::TEST, 10);
// build_task->Execute();
}
@ -54,16 +54,16 @@ TEST(SSTaskTest, TEST_TASK) {
// file->dimension_ = 64;
auto label = std::make_shared<BroadcastLabel>();
SSTestTask task(dummy_context, nullptr, label);
task.Load(LoadType::CPU2GPU, 0);
auto th = std::thread([&]() {
task.Execute();
});
task.Wait();
if (th.joinable()) {
th.join();
}
// SSTestTask task(dummy_context, nullptr, label);
// task.Load(LoadType::CPU2GPU, 0);
// auto th = std::thread([&]() {
// task.Execute();
// });
// task.Wait();
//
// if (th.joinable()) {
// th.join();
// }
// static const char* CONFIG_PATH = "/tmp/milvus_test";
// auto options = milvus::engine::DBFactory::BuildOption();

View File

@ -215,33 +215,10 @@ SSDBTest::SetUp() {
BaseTest::SetUp();
BaseTest::SnapshotStart(false);
db_ = std::make_shared<milvus::engine::SSDBImpl>(GetOptions());
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear();
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, false));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0));
auto default_conn = milvus::scheduler::Connection("IO", 500.0);
auto PCIE = milvus::scheduler::Connection("IO", 11000.0);
res_mgr->Connect("disk", "cpu", default_conn);
#ifdef MILVUS_GPU_VERSION
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("0", "GPU", 0));
res_mgr->Connect("cpu", "0", PCIE);
#endif
res_mgr->Start();
milvus::scheduler::SchedInst::GetInstance()->Start();
milvus::scheduler::JobMgrInst::GetInstance()->Start();
milvus::scheduler::CPUBuilderInst::GetInstance()->Start();
}
void
SSDBTest::TearDown() {
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
milvus::scheduler::CPUBuilderInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Clear();
BaseTest::SnapshotStop();
db_ = nullptr;
auto options = GetOptions();
@ -280,6 +257,46 @@ void
SSMetaTest::TearDown() {
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void
SSSchedulerTest::SetUp() {
BaseTest::SetUp();
BaseTest::SnapshotStart(true);
auto options = milvus::engine::DBOptions();
options.wal_enable_ = false;
db_ = std::make_shared<milvus::engine::SSDBImpl>(options);
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear();
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, false));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0));
auto default_conn = milvus::scheduler::Connection("IO", 500.0);
auto PCIE = milvus::scheduler::Connection("IO", 11000.0);
res_mgr->Connect("disk", "cpu", default_conn);
#ifdef MILVUS_GPU_VERSION
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("0", "GPU", 0));
res_mgr->Connect("cpu", "0", PCIE);
#endif
res_mgr->Start();
milvus::scheduler::SchedInst::GetInstance()->Start();
milvus::scheduler::JobMgrInst::GetInstance()->Start();
milvus::scheduler::CPUBuilderInst::GetInstance()->Start();
}
void
SSSchedulerTest::TearDown() {
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
milvus::scheduler::CPUBuilderInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Clear();
db_ = nullptr;
BaseTest::SnapshotStop();
BaseTest::TearDown();
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int
main(int argc, char **argv) {

View File

@ -342,3 +342,14 @@ class SSMetaTest : public BaseTest {
void
TearDown() override;
};
///////////////////////////////////////////////////////////////////////////////
class SSSchedulerTest : public BaseTest {
protected:
std::shared_ptr<SSDBImpl> db_;
void
SetUp() override;
void
TearDown() override;
};