Merge branch 'branch-0.5.0' into 'branch-0.5.0'

format scheduler code

See merge request megasearch/milvus!627

Former-commit-id: 4b46d2b89a1309faa30580dbcf58111c13c68ae5
pull/191/head
peng.xu 2019-09-27 14:22:23 +08:00
commit b046049ffd
97 changed files with 1694 additions and 1578 deletions

View File

@ -25,6 +25,7 @@
#include <string>
#include <memory>
#include <vector>
namespace zilliz {
namespace milvus {
@ -72,7 +73,6 @@ public:
virtual Status DropIndex(const std::string &table_id) = 0;
virtual Status DropAll() = 0;
}; // DB
using DBPtr = std::shared_ptr<DB>;

View File

@ -16,7 +16,7 @@
// under the License.
#include "DBFactory.h"
#include "db/DBFactory.h"
#include "DBImpl.h"
#include "utils/Exception.h"
#include "meta/MetaFactory.h"
@ -33,14 +33,16 @@ namespace zilliz {
namespace milvus {
namespace engine {
DBOptions DBFactory::BuildOption() {
DBOptions
DBFactory::BuildOption() {
auto meta = MetaFactory::BuildOption();
DBOptions options;
options.meta_ = meta;
return options;
}
DBPtr DBFactory::Build(const DBOptions& options) {
DBPtr
DBFactory::Build(const DBOptions &options) {
return std::make_shared<DBImpl>(options);
}

View File

@ -34,7 +34,6 @@ public:
static DBPtr Build(const DBOptions &options);
};
}
}
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "DBImpl.h"
#include "db/DBImpl.h"
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "engine/EngineFactory.h"
@ -36,6 +36,7 @@
#include <thread>
#include <iostream>
#include <cstring>
#include <algorithm>
#include <boost/filesystem.hpp>
namespace zilliz {
@ -48,8 +49,7 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
}
} // namespace
DBImpl::DBImpl(const DBOptions &options)
: options_(options),
@ -68,7 +68,8 @@ DBImpl::~DBImpl() {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status DBImpl::Start() {
Status
DBImpl::Start() {
if (!shutting_down_.load(std::memory_order_acquire)) {
return Status::OK();
}
@ -85,7 +86,8 @@ Status DBImpl::Start() {
return Status::OK();
}
Status DBImpl::Stop() {
Status
DBImpl::Stop() {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::OK();
}
@ -106,11 +108,13 @@ Status DBImpl::Stop() {
return Status::OK();
}
Status DBImpl::DropAll() {
Status
DBImpl::DropAll() {
return meta_ptr_->DropAll();
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
Status
DBImpl::CreateTable(meta::TableSchema &table_schema) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -120,7 +124,8 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
return meta_ptr_->CreateTable(temp_schema);
}
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
Status
DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -144,7 +149,8 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date
return Status::OK();
}
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
Status
DBImpl::DescribeTable(meta::TableSchema &table_schema) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -154,7 +160,8 @@ Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
return stat;
}
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
Status
DBImpl::HasTable(const std::string &table_id, bool &has_or_not) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -162,7 +169,8 @@ Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
return meta_ptr_->HasTable(table_id, has_or_not);
}
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
Status
DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -170,7 +178,8 @@ Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
return meta_ptr_->AllTables(table_schema_array);
}
Status DBImpl::PreloadTable(const std::string &table_id) {
Status
DBImpl::PreloadTable(const std::string &table_id) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -191,7 +200,11 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
for (auto &day_files : files) {
for (auto &file : day_files.second) {
ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_,
file.location_,
(EngineType) file.engine_type_,
(MetricType) file.metric_type_,
file.nlist_);
if (engine == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status(DB_ERROR, "Invalid engine type");
@ -215,7 +228,8 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
return Status::OK();
}
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
Status
DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -223,7 +237,8 @@ Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
return meta_ptr_->UpdateTableFlag(table_id, flag);
}
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
Status
DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -231,7 +246,8 @@ Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count
return meta_ptr_->Count(table_id, row_count);
}
Status DBImpl::InsertVectors(const std::string& table_id_,
Status
DBImpl::InsertVectors(const std::string &table_id_,
uint64_t n, const float *vectors, IDNumbers &vector_ids_) {
// ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
if (shutting_down_.load(std::memory_order_acquire)) {
@ -241,7 +257,8 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
Status status;
zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// std::chrono::microseconds time_span =
// std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
// ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
@ -249,7 +266,8 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
return status;
}
Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
Status
DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
{
std::unique_lock<std::mutex> lock(build_index_mutex_);
@ -316,16 +334,19 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
return Status::OK();
}
Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
Status
DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) {
return meta_ptr_->DescribeTableIndex(table_id, index);
}
Status DBImpl::DropIndex(const std::string& table_id) {
Status
DBImpl::DropIndex(const std::string &table_id) {
ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
return meta_ptr_->DropTableIndex(table_id);
}
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float *vectors, QueryResults &results) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
@ -337,7 +358,8 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6
return result;
}
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float *vectors, const meta::DatesT &dates, QueryResults &results) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
@ -364,7 +386,8 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint6
return status;
}
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Status
DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ids,
uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
const meta::DatesT &dates, QueryResults &results) {
if (shutting_down_.load(std::memory_order_acquire)) {
@ -405,7 +428,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
return status;
}
Status DBImpl::Size(uint64_t& result) {
Status
DBImpl::Size(uint64_t &result) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
@ -413,28 +437,28 @@ Status DBImpl::Size(uint64_t& result) {
return meta_ptr_->Size(result);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Status
DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files,
uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
const meta::DatesT &dates, QueryResults &results) {
using namespace scheduler;
server::CollectQueryMetrics metrics(nq);
TimeRecorder rc("");
//step 1: get files to search
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
SearchJobPtr job = std::make_shared<SearchJob>(0, k, nq, nprobe, vectors);
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: "
<< dates.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
for (auto &file : files) {
TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddIndexFile(file_ptr);
}
//step 2: put search task to scheduler
JobMgrInst::GetInstance()->Put(job);
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
if (!job->GetStatus().ok()) {
return job->GetStatus();
@ -453,9 +477,12 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
// double search_percent = search_cost/total_cost;
// double reduce_percent = reduce_cost/total_cost;
//
// ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info
// << " percent: " << load_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info
// << " percent: " << search_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info
// << " percent: " << reduce_percent*100 << "%";
// } else {
// ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
// << " search cost: " << search_info
@ -469,7 +496,8 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
return Status::OK();
}
void DBImpl::BackgroundTimerTask() {
void
DBImpl::BackgroundTimerTask() {
Status status;
server::SystemInfo::GetInstance().Init();
while (true) {
@ -489,21 +517,24 @@ void DBImpl::BackgroundTimerTask() {
}
}
void DBImpl::WaitMergeFileFinish() {
void
DBImpl::WaitMergeFileFinish() {
std::lock_guard<std::mutex> lck(compact_result_mutex_);
for (auto &iter : compact_thread_results_) {
iter.wait();
}
}
void DBImpl::WaitBuildIndexFinish() {
void
DBImpl::WaitBuildIndexFinish() {
std::lock_guard<std::mutex> lck(index_result_mutex_);
for (auto &iter : index_thread_results_) {
iter.wait();
}
}
void DBImpl::StartMetricTask() {
void
DBImpl::StartMetricTask() {
static uint64_t metric_clock_tick = 0;
metric_clock_tick++;
if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
@ -533,7 +564,8 @@ void DBImpl::StartMetricTask() {
ENGINE_LOG_TRACE << "Metric task finished";
}
Status DBImpl::MemSerialize() {
Status
DBImpl::MemSerialize() {
std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
std::set<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids);
@ -548,7 +580,8 @@ Status DBImpl::MemSerialize() {
return Status::OK();
}
void DBImpl::StartCompactionTask() {
void
DBImpl::StartCompactionTask() {
static uint64_t compact_clock_tick = 0;
compact_clock_tick++;
if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
@ -580,7 +613,8 @@ void DBImpl::StartCompactionTask() {
}
}
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
Status
DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date,
const meta::TableFilesSchema &files) {
ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
@ -602,7 +636,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
(MetricType) table_file.metric_type_, table_file.nlist_);
meta::TableFilesSchema updated;
long index_size = 0;
int64_t index_size = 0;
for (auto &file : files) {
server::CollectMergeFilesMetrics metrics;
@ -658,7 +692,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
return status;
}
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
Status
DBImpl::BackgroundMergeFiles(const std::string &table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
@ -685,7 +720,8 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
return Status::OK();
}
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE << " Background compaction thread start";
Status status;
@ -712,7 +748,8 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE << " Background compaction thread exit";
}
void DBImpl::StartBuildIndexTask(bool force) {
void
DBImpl::StartBuildIndexTask(bool force) {
static uint64_t index_clock_tick = 0;
index_clock_tick++;
if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
@ -740,7 +777,8 @@ void DBImpl::StartBuildIndexTask(bool force) {
}
}
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
Status
DBImpl::BuildIndex(const meta::TableFileSchema &file) {
ExecutionEnginePtr to_index =
EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_,
(MetricType) file.metric_type_, file.nlist_);
@ -761,7 +799,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
meta::TableFileSchema table_file;
table_file.table_id_ = file.table_id_;
table_file.date_ = file.date_;
table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
table_file.file_type_ =
meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
@ -777,11 +816,11 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
if (index == nullptr) {
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
<< " to to_delete";
return status;
}
} catch (std::exception &ex) {
//typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
@ -791,7 +830,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
<< std::endl;
return Status(DB_ERROR, msg);
}
@ -850,7 +890,6 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
}
} catch (std::exception &ex) {
std::string msg = "Build index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
@ -860,7 +899,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
return Status::OK();
}
void DBImpl::BackgroundBuildIndex() {
void
DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_TRACE << "Background build index thread start";
std::unique_lock<std::mutex> lock(build_index_mutex_);

View File

@ -29,7 +29,8 @@
#include <thread>
#include <list>
#include <set>
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
@ -151,7 +152,6 @@ class DBImpl : public DB {
std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_;
}; // DBImpl

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "IDGenerator.h"
#include "db/IDGenerator.h"
#include <chrono>
#include <assert.h>
@ -29,14 +29,16 @@ IDGenerator::~IDGenerator() = default;
constexpr size_t SimpleIDGenerator::MAX_IDS_PER_MICRO;
IDNumber SimpleIDGenerator::GetNextIDNumber() {
IDNumber
SimpleIDGenerator::GetNextIDNumber() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
return micros * MAX_IDS_PER_MICRO;
}
void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
void
SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers &ids) {
if (n > MAX_IDS_PER_MICRO) {
NextIDNumbers(n - MAX_IDS_PER_MICRO, ids);
NextIDNumbers(MAX_IDS_PER_MICRO, ids);
@ -56,12 +58,12 @@ void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
}
}
void SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) {
void
SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers &ids) {
ids.clear();
NextIDNumbers(n, ids);
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -22,7 +22,6 @@
#include <cstddef>
#include <vector>
namespace zilliz {
namespace milvus {
namespace engine {
@ -55,7 +54,6 @@ class SimpleIDGenerator : public IDGenerator {
NextIDNumbers(size_t n, IDNumbers &ids);
static constexpr size_t MAX_IDS_PER_MICRO = 1000;
}; // SimpleIDGenerator

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "Options.h"
#include "db/Options.h"
#include "utils/Exception.h"
#include "utils/Log.h"
@ -32,13 +32,15 @@ ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias)
ParseCritirias(criterias);
}
void ArchiveConf::SetCriterias(const ArchiveConf::CriteriaT& criterial) {
void
ArchiveConf::SetCriterias(const ArchiveConf::CriteriaT &criterial) {
for (auto &pair : criterial) {
criterias_[pair.first] = pair.second;
}
}
void ArchiveConf::ParseCritirias(const std::string& criterias) {
void
ArchiveConf::ParseCritirias(const std::string &criterias) {
std::stringstream ss(criterias);
std::vector<std::string> tokens;
@ -80,7 +82,8 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) {
}
}
void ArchiveConf::ParseType(const std::string& type) {
void
ArchiveConf::ParseType(const std::string &type) {
if (type != "delete" && type != "swap") {
std::string msg = "Invalid argument: type='" + type + "'";
throw InvalidArgumentException(msg);

View File

@ -36,10 +36,15 @@ static const char* ARCHIVE_CONF_DAYS = "days";
struct ArchiveConf {
using CriteriaT = std::map<std::string, int>;
ArchiveConf(const std::string& type, const std::string& criterias = std::string());
explicit ArchiveConf(const std::string &type, const std::string &criterias = std::string());
const std::string& GetType() const { return type_; }
const CriteriaT GetCriterias() const { return criterias_; }
const std::string &GetType() const {
return type_;
}
const CriteriaT GetCriterias() const {
return criterias_;
}
void SetCriterias(const ArchiveConf::CriteriaT &criterial);

View File

@ -21,12 +21,13 @@
#include <vector>
#include <stdint.h>
#include <utility>
namespace zilliz {
namespace milvus {
namespace engine {
typedef long IDNumber;
typedef int64_t IDNumber;
typedef IDNumber *IDNumberPtr;
typedef std::vector<IDNumber> IDNumbers;

View File

@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.
#include "Utils.h"
#include "db/Utils.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include <mutex>
#include <chrono>
#include <regex>
#include <vector>
#include <boost/filesystem.hpp>
namespace zilliz {
@ -36,13 +37,15 @@ const char* TABLES_FOLDER = "/tables/";
uint64_t index_file_counter = 0;
std::mutex index_file_counter_mutex;
std::string ConstructParentFolder(const std::string& db_path, const meta::TableFileSchema& table_file) {
std::string
ConstructParentFolder(const std::string &db_path, const meta::TableFileSchema &table_file) {
std::string table_path = db_path + TABLES_FOLDER + table_file.table_id_;
std::string partition_path = table_path + "/" + std::to_string(table_file.date_);
return partition_path;
}
std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::TableFileSchema& table_file) {
std::string
GetTableFileParentFolder(const DBMetaOptions &options, const meta::TableFileSchema &table_file) {
uint64_t path_count = options.slave_paths_.size() + 1;
std::string target_path = options.path_;
uint64_t index = 0;
@ -67,9 +70,10 @@ std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::T
return ConstructParentFolder(target_path, table_file);
}
}
} // namespace
long GetMicroSecTimeStamp() {
int64_t
GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
@ -77,7 +81,8 @@ long GetMicroSecTimeStamp() {
return micros;
}
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id) {
Status
CreateTablePath(const DBMetaOptions &options, const std::string &table_id) {
std::string db_path = options.path_;
std::string table_path = db_path + TABLES_FOLDER + table_id;
auto status = server::CommonUtil::CreateDirectory(table_path);
@ -98,7 +103,8 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id
return Status::OK();
}
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id, bool force) {
Status
DeleteTablePath(const DBMetaOptions &options, const std::string &table_id, bool force) {
std::vector<std::string> paths = options.slave_paths_;
paths.push_back(options.path_);
@ -117,7 +123,8 @@ Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id
return Status::OK();
}
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
Status
CreateTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file) {
std::string parent_path = GetTableFileParentFolder(options, table_file);
auto status = server::CommonUtil::CreateDirectory(parent_path);
@ -131,7 +138,8 @@ Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
return Status::OK();
}
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
Status
GetTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file) {
std::string parent_path = ConstructParentFolder(options.path_, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_;
if (boost::filesystem::exists(file_path)) {
@ -155,19 +163,22 @@ Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& tab
return Status(DB_ERROR, msg);
}
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
Status
DeleteTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file) {
utils::GetTableFilePath(options, table_file);
boost::filesystem::remove(table_file.location_);
return Status::OK();
}
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
bool
IsSameIndex(const TableIndex &index1, const TableIndex &index2) {
return index1.engine_type_ == index2.engine_type_
&& index1.nlist_ == index2.nlist_
&& index1.metric_type_ == index2.metric_type_;
}
meta::DateT GetDate(const std::time_t& t, int day_delta) {
meta::DateT
GetDate(const std::time_t &t, int day_delta) {
struct tm ltm;
localtime_r(&t, &ltm);
if (day_delta > 0) {
@ -188,16 +199,19 @@ meta::DateT GetDate(const std::time_t& t, int day_delta) {
return ltm.tm_year * 10000 + ltm.tm_mon * 100 + ltm.tm_mday;
}
meta::DateT GetDateWithDelta(int day_delta) {
meta::DateT
GetDateWithDelta(int day_delta) {
return GetDate(std::time(nullptr), day_delta);
}
meta::DateT GetDate() {
meta::DateT
GetDate() {
return GetDate(std::time(nullptr), 0);
}
// URI format: dialect://username:password@host:port/database
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info) {
Status
ParseMetaUri(const std::string &uri, MetaUriInfo &info) {
std::string dialect_regex = "(.*)";
std::string username_tegex = "(.*)";
std::string password_regex = "(.*)";

View File

@ -29,20 +29,30 @@ namespace milvus {
namespace engine {
namespace utils {
long GetMicroSecTimeStamp();
int64_t
GetMicroSecTimeStamp();
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id);
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id, bool force = true);
Status
CreateTablePath(const DBMetaOptions &options, const std::string &table_id);
Status
DeleteTablePath(const DBMetaOptions &options, const std::string &table_id, bool force = true);
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status
CreateTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file);
Status
GetTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file);
Status
DeleteTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file);
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2);
bool
IsSameIndex(const TableIndex &index1, const TableIndex &index2);
meta::DateT GetDate(const std::time_t &t, int day_delta = 0);
meta::DateT GetDate();
meta::DateT GetDateWithDelta(int day_delta);
meta::DateT
GetDate(const std::time_t &t, int day_delta = 0);
meta::DateT
GetDate();
meta::DateT
GetDateWithDelta(int day_delta);
struct MetaUriInfo {
std::string dialect_;
@ -53,7 +63,8 @@ struct MetaUriInfo {
std::string db_name_;
};
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info);
Status
ParseMetaUri(const std::string &uri, MetaUriInfo &info);
} // namespace utils
} // namespace engine

View File

@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
#include "EngineFactory.h"
#include "ExecutionEngineImpl.h"
#include "db/engine/EngineFactory.h"
#include "db/engine/ExecutionEngineImpl.h"
#include "utils/Log.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
@ -29,7 +31,6 @@ EngineFactory::Build(uint16_t dimension,
EngineType index_type,
MetricType metric_type,
int32_t nlist) {
if (index_type == EngineType::INVALID) {
ENGINE_LOG_ERROR << "Unsupported engine type";
return nullptr;
@ -43,6 +44,6 @@ EngineFactory::Build(uint16_t dimension,
return execution_engine_ptr;
}
}
}
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -21,6 +21,8 @@
#include "ExecutionEngine.h"
#include "utils/Status.h"
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
@ -34,6 +36,7 @@ public:
int32_t nlist);
};
}
}
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -21,6 +21,7 @@
#include <vector>
#include <memory>
#include <string>
namespace zilliz {
namespace milvus {
@ -42,8 +43,7 @@ enum class MetricType {
class ExecutionEngine {
public:
virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0;
virtual Status AddWithIds(int64_t n, const float *xdata, const int64_t *xids) = 0;
virtual size_t Count() const = 0;
@ -65,12 +65,12 @@ public:
virtual Status Merge(const std::string &location) = 0;
virtual Status Search(long n,
virtual Status Search(int64_t n,
const float *data,
long k,
long nprobe,
int64_t k,
int64_t nprobe,
float *distances,
long *labels) const = 0;
int64_t *labels) const = 0;
virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string &location, EngineType engine_type) = 0;
@ -89,7 +89,6 @@ public:
using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>;
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "ExecutionEngineImpl.h"
#include "db/engine/ExecutionEngineImpl.h"
#include "cache/GpuCacheMgr.h"
#include "cache/CpuCacheMgr.h"
#include "metrics/Metrics.h"
@ -29,6 +29,7 @@
#include "server/Config.h"
#include <stdexcept>
#include <utility>
namespace zilliz {
namespace milvus {
@ -72,7 +73,8 @@ ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index,
nlist_(nlist) {
}
VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
VecIndexPtr
ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
std::shared_ptr<VecIndex> index;
switch (type) {
case EngineType::FAISS_IDMAP: {
@ -99,12 +101,14 @@ VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
return index;
}
Status ExecutionEngineImpl::AddWithIds(long n, const float *xdata, const long *xids) {
Status
ExecutionEngineImpl::AddWithIds(int64_t n, const float *xdata, const int64_t *xids) {
auto status = index_->Add(n, xdata, xids);
return status;
}
size_t ExecutionEngineImpl::Count() const {
size_t
ExecutionEngineImpl::Count() const {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
return 0;
@ -112,11 +116,13 @@ size_t ExecutionEngineImpl::Count() const {
return index_->Count();
}
size_t ExecutionEngineImpl::Size() const {
size_t
ExecutionEngineImpl::Size() const {
return (size_t) (Count() * Dimension()) * sizeof(float);
}
size_t ExecutionEngineImpl::Dimension() const {
size_t
ExecutionEngineImpl::Dimension() const {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
return dim_;
@ -124,16 +130,19 @@ size_t ExecutionEngineImpl::Dimension() const {
return index_->Dimension();
}
size_t ExecutionEngineImpl::PhysicalSize() const {
size_t
ExecutionEngineImpl::PhysicalSize() const {
return server::CommonUtil::GetFileSize(location_);
}
Status ExecutionEngineImpl::Serialize() {
Status
ExecutionEngineImpl::Serialize() {
auto status = write_index(index_, location_);
return status;
}
Status ExecutionEngineImpl::Load(bool to_cache) {
Status
ExecutionEngineImpl::Load(bool to_cache) {
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) {
@ -160,7 +169,8 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
return Status::OK();
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
@ -187,7 +197,8 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
return Status::OK();
}
Status ExecutionEngineImpl::CopyToCpu() {
Status
ExecutionEngineImpl::CopyToCpu() {
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
@ -213,7 +224,8 @@ Status ExecutionEngineImpl::CopyToCpu() {
return Status::OK();
}
ExecutionEnginePtr ExecutionEngineImpl::Clone() {
ExecutionEnginePtr
ExecutionEngineImpl::Clone() {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
return nullptr;
@ -225,7 +237,8 @@ ExecutionEnginePtr ExecutionEngineImpl::Clone() {
return ret;
}
Status ExecutionEngineImpl::Merge(const std::string &location) {
Status
ExecutionEngineImpl::Merge(const std::string &location) {
if (location == location_) {
return Status(DB_ERROR, "Cannot Merge Self");
}
@ -290,12 +303,13 @@ ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_t
return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
}
Status ExecutionEngineImpl::Search(long n,
Status
ExecutionEngineImpl::Search(int64_t n,
const float *data,
long k,
long nprobe,
int64_t k,
int64_t nprobe,
float *distances,
long *labels) const {
int64_t *labels) const {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
return Status(DB_ERROR, "index is null");
@ -310,14 +324,16 @@ Status ExecutionEngineImpl::Search(long n,
return status;
}
Status ExecutionEngineImpl::Cache() {
Status
ExecutionEngineImpl::Cache() {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
return Status::OK();
}
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
@ -325,8 +341,8 @@ Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
}
// TODO(linxj): remove.
Status ExecutionEngineImpl::Init() {
using namespace zilliz::milvus::server;
Status
ExecutionEngineImpl::Init() {
server::Config &config = server::Config::GetInstance();
Status s = config.GetDBConfigBuildIndexGPU(gpu_num_);
if (!s.ok()) return s;
@ -334,7 +350,6 @@ Status ExecutionEngineImpl::Init() {
return Status::OK();
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -23,15 +23,12 @@
#include <memory>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
class ExecutionEngineImpl : public ExecutionEngine {
public:
ExecutionEngineImpl(uint16_t dimension,
const std::string &location,
EngineType index_type,
@ -44,7 +41,7 @@ public:
MetricType metric_type,
int32_t nlist);
Status AddWithIds(long n, const float *xdata, const long *xids) override;
Status AddWithIds(int64_t n, const float *xdata, const int64_t *xids) override;
size_t Count() const override;
@ -66,12 +63,12 @@ public:
Status Merge(const std::string &location) override;
Status Search(long n,
Status Search(int64_t n,
const float *data,
long k,
long nprobe,
int64_t k,
int64_t nprobe,
float *distances,
long *labels) const override;
int64_t *labels) const override;
ExecutionEnginePtr BuildIndex(const std::string &location, EngineType engine_type) override;
@ -81,11 +78,17 @@ public:
Status Init() override;
EngineType IndexEngineType() const override { return index_type_; }
EngineType IndexEngineType() const override {
return index_type_;
}
MetricType IndexMetricType() const override { return metric_type_; }
MetricType IndexMetricType() const override {
return metric_type_;
}
std::string GetLocation() const override { return location_; }
std::string GetLocation() const override {
return location_;
}
private:
VecIndexPtr CreatetVecIndex(EngineType type);
@ -104,7 +107,6 @@ protected:
int32_t gpu_num_ = 0;
};
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -23,6 +23,7 @@
#include <set>
#include <memory>
#include <string>
namespace zilliz {
namespace milvus {
@ -30,7 +31,6 @@ namespace engine {
class MemManager {
public:
virtual Status InsertVectors(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids) = 0;
@ -43,7 +43,6 @@ class MemManager {
virtual size_t GetCurrentImmutableMem() = 0;
virtual size_t GetCurrentMem() = 0;
}; // MemManagerAbstract
using MemManagerPtr = std::shared_ptr<MemManager>;

View File

@ -16,19 +16,19 @@
// under the License.
#include "MemManagerImpl.h"
#include "db/insert/MemManagerImpl.h"
#include "VectorSource.h"
#include "utils/Log.h"
#include "db/Constants.h"
#include <thread>
namespace zilliz {
namespace milvus {
namespace engine {
MemTablePtr MemManagerImpl::GetMemByTable(const std::string &table_id) {
MemTablePtr
MemManagerImpl::GetMemByTable(const std::string &table_id) {
auto memIt = mem_id_map_.find(table_id);
if (memIt != mem_id_map_.end()) {
return memIt->second;
@ -38,11 +38,11 @@ MemTablePtr MemManagerImpl::GetMemByTable(const std::string &table_id) {
return mem_id_map_[table_id];
}
Status MemManagerImpl::InsertVectors(const std::string &table_id_,
Status
MemManagerImpl::InsertVectors(const std::string &table_id_,
size_t n_,
const float *vectors_,
IDNumbers &vector_ids_) {
while (GetCurrentMem() > options_.insert_buffer_size_) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
@ -52,11 +52,11 @@ Status MemManagerImpl::InsertVectors(const std::string &table_id_,
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}
Status MemManagerImpl::InsertVectorsNoLock(const std::string &table_id,
Status
MemManagerImpl::InsertVectorsNoLock(const std::string &table_id,
size_t n,
const float *vectors,
IDNumbers &vector_ids) {
MemTablePtr mem = GetMemByTable(table_id);
VectorSourcePtr source = std::make_shared<VectorSource>(n, vectors);
@ -69,7 +69,8 @@ Status MemManagerImpl::InsertVectorsNoLock(const std::string &table_id,
return status;
}
Status MemManagerImpl::ToImmutable() {
Status
MemManagerImpl::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
MemIdMap temp_map;
for (auto &kv : mem_id_map_) {
@ -85,7 +86,8 @@ Status MemManagerImpl::ToImmutable() {
return Status::OK();
}
Status MemManagerImpl::Serialize(std::set<std::string> &table_ids) {
Status
MemManagerImpl::Serialize(std::set<std::string> &table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
table_ids.clear();
@ -97,7 +99,8 @@ Status MemManagerImpl::Serialize(std::set<std::string> &table_ids) {
return Status::OK();
}
Status MemManagerImpl::EraseMemVector(const std::string &table_id) {
Status
MemManagerImpl::EraseMemVector(const std::string &table_id) {
{//erase MemVector from rapid-insert cache
std::unique_lock<std::mutex> lock(mutex_);
mem_id_map_.erase(table_id);
@ -117,7 +120,8 @@ Status MemManagerImpl::EraseMemVector(const std::string &table_id) {
return Status::OK();
}
size_t MemManagerImpl::GetCurrentMutableMem() {
size_t
MemManagerImpl::GetCurrentMutableMem() {
size_t total_mem = 0;
for (auto &kv : mem_id_map_) {
auto memTable = kv.second;
@ -126,7 +130,8 @@ size_t MemManagerImpl::GetCurrentMutableMem() {
return total_mem;
}
size_t MemManagerImpl::GetCurrentImmutableMem() {
size_t
MemManagerImpl::GetCurrentImmutableMem() {
size_t total_mem = 0;
for (auto &mem_table : immu_mem_list_) {
total_mem += mem_table->GetCurrentMem();
@ -134,7 +139,8 @@ size_t MemManagerImpl::GetCurrentImmutableMem() {
return total_mem;
}
size_t MemManagerImpl::GetCurrentMem() {
size_t
MemManagerImpl::GetCurrentMem() {
return GetCurrentMutableMem() + GetCurrentImmutableMem();
}

View File

@ -24,12 +24,13 @@
#include "utils/Status.h"
#include <map>
#include <set>
#include <vector>
#include <string>
#include <ctime>
#include <memory>
#include <mutex>
namespace zilliz {
namespace milvus {
namespace engine {
@ -39,7 +40,8 @@ class MemManagerImpl : public MemManager {
using Ptr = std::shared_ptr<MemManagerImpl>;
MemManagerImpl(const meta::MetaPtr &meta, const DBOptions &options)
: meta_(meta), options_(options) {}
: meta_(meta), options_(options) {
}
Status InsertVectors(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids) override;
@ -71,7 +73,6 @@ class MemManagerImpl : public MemManager {
std::mutex serialization_mtx_;
}; // NewMemManager
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "MemMenagerFactory.h"
#include "db/insert/MemMenagerFactory.h"
#include "MemManagerImpl.h"
#include "utils/Log.h"
#include "utils/Exception.h"
@ -26,12 +26,14 @@
#include <cstdlib>
#include <string>
#include <regex>
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
MemManagerPtr MemManagerFactory::Build(const std::shared_ptr<meta::Meta>& meta, const DBOptions& options) {
MemManagerPtr
MemManagerFactory::Build(const std::shared_ptr<meta::Meta> &meta, const DBOptions &options) {
return std::make_shared<MemManagerImpl>(meta, options);
}

View File

@ -20,6 +20,8 @@
#include "MemManager.h"
#include "db/meta/Meta.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
@ -29,6 +31,6 @@ public:
static MemManagerPtr Build(const std::shared_ptr<meta::Meta> &meta, const DBOptions &options);
};
}
}
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -16,9 +16,11 @@
// under the License.
#include "MemTable.h"
#include "db/insert/MemTable.h"
#include "utils/Log.h"
#include <memory>
#include <string>
namespace zilliz {
namespace milvus {
@ -30,13 +32,11 @@ MemTable::MemTable(const std::string &table_id,
table_id_(table_id),
meta_(meta),
options_(options) {
}
Status MemTable::Add(VectorSourcePtr &source, IDNumbers &vector_ids) {
Status
MemTable::Add(VectorSourcePtr &source, IDNumbers &vector_ids) {
while (!source->AllAdded()) {
MemTableFilePtr current_mem_table_file;
if (!mem_table_file_list_.empty()) {
current_mem_table_file = mem_table_file_list_.back();
@ -62,15 +62,18 @@ Status MemTable::Add(VectorSourcePtr &source, IDNumbers &vector_ids) {
return Status::OK();
}
void MemTable::GetCurrentMemTableFile(MemTableFilePtr &mem_table_file) {
void
MemTable::GetCurrentMemTableFile(MemTableFilePtr &mem_table_file) {
mem_table_file = mem_table_file_list_.back();
}
size_t MemTable::GetTableFileCount() {
size_t
MemTable::GetTableFileCount() {
return mem_table_file_list_.size();
}
Status MemTable::Serialize() {
Status
MemTable::Serialize() {
for (auto mem_table_file = mem_table_file_list_.begin(); mem_table_file != mem_table_file_list_.end();) {
auto status = (*mem_table_file)->Serialize();
if (!status.ok()) {
@ -84,15 +87,18 @@ Status MemTable::Serialize() {
return Status::OK();
}
bool MemTable::Empty() {
bool
MemTable::Empty() {
return mem_table_file_list_.empty();
}
const std::string &MemTable::GetTableId() const {
const std::string &
MemTable::GetTableId() const {
return table_id_;
}
size_t MemTable::GetCurrentMem() {
size_t
MemTable::GetCurrentMem() {
std::lock_guard<std::mutex> lock(mutex_);
size_t total_mem = 0;
for (auto &mem_table_file : mem_table_file_list_) {

View File

@ -23,7 +23,9 @@
#include "utils/Status.h"
#include <mutex>
#include <vector>
#include <memory>
#include <string>
namespace zilliz {
namespace milvus {
@ -59,7 +61,6 @@ class MemTable {
DBOptions options_;
std::mutex mutex_;
}; //MemTable
using MemTablePtr = std::shared_ptr<MemTable>;

View File

@ -16,14 +16,14 @@
// under the License.
#include "MemTableFile.h"
#include "db/insert/MemTableFile.h"
#include "db/Constants.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "utils/Log.h"
#include <cmath>
#include <string>
namespace zilliz {
namespace milvus {
@ -35,7 +35,6 @@ MemTableFile::MemTableFile(const std::string &table_id,
table_id_(table_id),
meta_(meta),
options_(options) {
current_mem_ = 0;
auto status = CreateTableFile();
if (status.ok()) {
@ -47,8 +46,8 @@ MemTableFile::MemTableFile(const std::string &table_id,
}
}
Status MemTableFile::CreateTableFile() {
Status
MemTableFile::CreateTableFile() {
meta::TableFileSchema table_file_schema;
table_file_schema.table_id_ = table_id_;
auto status = meta_->CreateTableFile(table_file_schema);
@ -61,8 +60,8 @@ Status MemTableFile::CreateTableFile() {
return status;
}
Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) {
Status
MemTableFile::Add(const VectorSourcePtr &source, IDNumbers &vector_ids) {
if (table_file_schema_.dimension_ <= 0) {
std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " +
std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_;
@ -75,7 +74,8 @@ Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) {
if (mem_left >= single_vector_mem_size) {
size_t num_vectors_to_add = std::ceil(mem_left / single_vector_mem_size);
size_t num_vectors_added;
auto status = source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added, vector_ids);
auto status =
source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added, vector_ids);
if (status.ok()) {
current_mem_ += (num_vectors_added * single_vector_mem_size);
}
@ -84,20 +84,24 @@ Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) {
return Status::OK();
}
size_t MemTableFile::GetCurrentMem() {
size_t
MemTableFile::GetCurrentMem() {
return current_mem_;
}
size_t MemTableFile::GetMemLeft() {
size_t
MemTableFile::GetMemLeft() {
return (MAX_TABLE_FILE_MEM - current_mem_);
}
bool MemTableFile::IsFull() {
bool
MemTableFile::IsFull() {
size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
return (GetMemLeft() < single_vector_mem_size);
}
Status MemTableFile::Serialize() {
Status
MemTableFile::Serialize() {
size_t size = GetCurrentMem();
server::CollectSerializeMetrics metrics(size);

View File

@ -23,13 +23,14 @@
#include "db/engine/ExecutionEngine.h"
#include "utils/Status.h"
#include <string>
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
class MemTableFile {
public:
MemTableFile(const std::string &table_id, const meta::MetaPtr &meta, const DBOptions &options);
@ -44,21 +45,16 @@ class MemTableFile {
Status Serialize();
private:
Status CreateTableFile();
private:
const std::string table_id_;
meta::TableFileSchema table_file_schema_;
meta::MetaPtr meta_;
DBOptions options_;
size_t current_mem_;
ExecutionEnginePtr execution_engine_;
}; //MemTableFile
using MemTableFilePtr = std::shared_ptr<MemTableFile>;

View File

@ -16,18 +16,16 @@
// under the License.
#include "VectorSource.h"
#include "db/insert/VectorSource.h"
#include "db/engine/ExecutionEngine.h"
#include "db/engine/EngineFactory.h"
#include "utils/Log.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace engine {
VectorSource::VectorSource(const size_t &n,
const float *vectors) :
n_(n),
@ -36,12 +34,12 @@ VectorSource::VectorSource(const size_t &n,
current_num_vectors_added = 0;
}
Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
Status
VectorSource::Add(const ExecutionEnginePtr &execution_engine,
const meta::TableFileSchema &table_file_schema,
const size_t &num_vectors_to_add,
size_t &num_vectors_added,
IDNumbers &vector_ids) {
server::CollectAddMetrics metrics(n_, table_file_schema.dimension_);
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
@ -70,15 +68,18 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
return status;
}
size_t VectorSource::GetNumVectorsAdded() {
size_t
VectorSource::GetNumVectorsAdded() {
return current_num_vectors_added;
}
bool VectorSource::AllAdded() {
bool
VectorSource::AllAdded() {
return (current_num_vectors_added == n_);
}
IDNumbers VectorSource::GetVectorIds() {
IDNumbers
VectorSource::GetVectorIds() {
return vector_ids_;
}

View File

@ -23,6 +23,7 @@
#include "db/engine/ExecutionEngine.h"
#include "utils/Status.h"
#include <memory>
namespace zilliz {
namespace milvus {
@ -45,7 +46,6 @@ class VectorSource {
IDNumbers GetVectorIds();
private:
const size_t n_;
const float *vectors_;
IDNumbers vector_ids_;
@ -53,7 +53,6 @@ class VectorSource {
size_t current_num_vectors_added;
std::shared_ptr<IDGenerator> id_generator_;
}; //VectorSource
using VectorSourcePtr = std::shared_ptr<VectorSource>;

View File

@ -25,6 +25,8 @@
#include <cstddef>
#include <memory>
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
@ -96,7 +98,6 @@ class Meta {
virtual Status DropAll() = 0;
virtual Status Count(const std::string &table_id, uint64_t &result) = 0;
}; // MetaData
using MetaPtr = std::shared_ptr<Meta>;

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "MetaFactory.h"
#include "db/meta/MetaFactory.h"
#include "SqliteMetaImpl.h"
#include "MySQLMetaImpl.h"
#include "utils/Log.h"
@ -28,17 +28,21 @@
#include <cstdlib>
#include <string>
#include <string.h>
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
DBMetaOptions MetaFactory::BuildOption(const std::string &path) {
DBMetaOptions
MetaFactory::BuildOption(const std::string &path) {
auto p = path;
if (p == "") {
srand(time(nullptr));
std::stringstream ss;
ss << "/tmp/" << rand();
uint32_t rd = 0;
rand_r(&rd);
ss << "/tmp/" << rd;
p = ss.str();
}
@ -47,7 +51,8 @@ namespace engine {
return meta;
}
meta::MetaPtr MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) {
meta::MetaPtr
MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) {
std::string uri = metaOptions.backend_uri_;
utils::MetaUriInfo uri_info;

View File

@ -20,6 +20,8 @@
#include "Meta.h"
#include "db/Options.h"
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
@ -31,7 +33,6 @@ public:
static meta::MetaPtr Build(const DBMetaOptions &metaOptions, const int &mode);
};
}
}
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -23,6 +23,7 @@
#include <vector>
#include <map>
#include <string>
#include <memory>
namespace zilliz {
namespace milvus {

View File

@ -16,7 +16,7 @@
// under the License.
#include "MySQLConnectionPool.h"
#include "db/meta/MySQLConnectionPool.h"
namespace zilliz {
namespace milvus {
@ -28,7 +28,8 @@ namespace meta {
// already. Can't do this in create() because we're interested in
// connections actually in use, not those created. Also note that
// we keep our own count; ConnectionPool::size() isn't the same!
mysqlpp::Connection *MySQLConnectionPool::grab() {
mysqlpp::Connection *
MySQLConnectionPool::grab() {
while (conns_in_use_ > max_pool_size_) {
sleep(1);
}
@ -38,11 +39,12 @@ namespace meta {
}
// Other half of in-use conn count limit
void MySQLConnectionPool::release(const mysqlpp::Connection *pc) {
void
MySQLConnectionPool::release(const mysqlpp::Connection *pc) {
mysqlpp::ConnectionPool::release(pc);
if (conns_in_use_ <= 0) {
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_;
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = "
<< conns_in_use_;
} else {
--conns_in_use_;
}
@ -56,13 +58,14 @@ namespace meta {
// max_idle_time_ = max_idle;
// }
std::string MySQLConnectionPool::getDB() {
std::string
MySQLConnectionPool::getDB() {
return db_;
}
// Superclass overrides
mysqlpp::Connection *MySQLConnectionPool::create() {
mysqlpp::Connection *
MySQLConnectionPool::create() {
try {
// Create connection using the parameters we were passed upon
// creation.
@ -80,13 +83,15 @@ namespace meta {
}
}
void MySQLConnectionPool::destroy(mysqlpp::Connection *cp) {
void
MySQLConnectionPool::destroy(mysqlpp::Connection *cp) {
// Our superclass can't know how we created the Connection, so
// it delegates destruction to us, to be safe.
delete cp;
}
unsigned int MySQLConnectionPool::max_idle_time() {
unsigned int
MySQLConnectionPool::max_idle_time() {
return max_idle_time_;
}

View File

@ -30,7 +30,6 @@ namespace engine {
namespace meta {
class MySQLConnectionPool : public mysqlpp::ConnectionPool {
public:
// The object's only constructor
MySQLConnectionPool(std::string dbName,
@ -45,9 +44,7 @@ public:
server_(serverIp),
port_(port),
max_pool_size_(maxPoolSize) {
conns_in_use_ = 0;
max_idle_time_ = 10; //10 seconds
}
@ -69,7 +66,6 @@ public:
std::string getDB();
protected:
// Superclass overrides
mysqlpp::Connection *create() override;

File diff suppressed because it is too large Load Diff

View File

@ -21,26 +21,25 @@
#include "db/Options.h"
#include "MySQLConnectionPool.h"
#include "mysql++/mysql++.h"
#include <mysql++/mysql++.h>
#include <mutex>
#include <vector>
#include <string>
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
// auto StoragePrototype(const std::string& path);
using namespace mysqlpp;
class MySQLMetaImpl : public Meta {
public:
MySQLMetaImpl(const DBMetaOptions &options_, const int &mode);
MySQLMetaImpl(const DBMetaOptions &options, const int &mode);
~MySQLMetaImpl();
Status CreateTable(TableSchema &table_schema) override;
Status DescribeTable(TableSchema &group_info_) override;
Status DescribeTable(TableSchema &table_schema) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override;
@ -102,7 +101,7 @@ class MySQLMetaImpl : public Meta {
private:
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
Status DiscardFiles(long long to_discard_size);
Status DiscardFiles(int64_t to_discard_size);
void ValidateMetaSchema();
Status Initialize();

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "SqliteMetaImpl.h"
#include "db/meta/SqliteMetaImpl.h"
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "utils/Log.h"
@ -29,9 +29,11 @@
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <memory>
#include <map>
#include <set>
#include <sqlite_orm.h>
namespace zilliz {
namespace milvus {
namespace engine {
@ -41,7 +43,8 @@ using namespace sqlite_orm;
namespace {
Status HandleException(const std::string &desc, const char* what = nullptr) {
Status
HandleException(const std::string &desc, const char *what = nullptr) {
if (what == nullptr) {
ENGINE_LOG_ERROR << desc;
return Status(DB_META_TRANSACTION_FAILED, desc);
@ -52,9 +55,10 @@ Status HandleException(const std::string &desc, const char* what = nullptr) {
}
}
}
} // namespace
inline auto StoragePrototype(const std::string &path) {
inline auto
StoragePrototype(const std::string &path) {
return make_storage(path,
make_table(META_TABLES,
make_column("id", &TableSchema::id_, primary_key()),
@ -77,24 +81,22 @@ inline auto StoragePrototype(const std::string &path) {
make_column("row_count", &TableFileSchema::row_count_, default_value(0)),
make_column("updated_time", &TableFileSchema::updated_time_),
make_column("created_on", &TableFileSchema::created_on_),
make_column("date", &TableFileSchema::date_))
);
make_column("date", &TableFileSchema::date_)));
}
using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
: options_(options_) {
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options)
: options_(options) {
Initialize();
}
SqliteMetaImpl::~SqliteMetaImpl() {
}
Status SqliteMetaImpl::NextTableId(std::string &table_id) {
Status
SqliteMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.GetNextIDNumber();
@ -102,7 +104,8 @@ Status SqliteMetaImpl::NextTableId(std::string &table_id) {
return Status::OK();
}
Status SqliteMetaImpl::NextFileId(std::string &file_id) {
Status
SqliteMetaImpl::NextFileId(std::string &file_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.GetNextIDNumber();
@ -110,7 +113,8 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) {
return Status::OK();
}
void SqliteMetaImpl::ValidateMetaSchema() {
void
SqliteMetaImpl::ValidateMetaSchema() {
if (ConnectorPtr == nullptr) {
return;
}
@ -127,7 +131,8 @@ void SqliteMetaImpl::ValidateMetaSchema() {
}
}
Status SqliteMetaImpl::Initialize() {
Status
SqliteMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path_)) {
auto ret = boost::filesystem::create_directory(options_.path_);
if (!ret) {
@ -151,7 +156,8 @@ Status SqliteMetaImpl::Initialize() {
}
// PXU TODO: Temp solution. Will fix later
Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status
SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) {
if (dates.size() == 0) {
return Status::OK();
@ -171,15 +177,12 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
),
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::date_, dates)
));
in(&TableFileSchema::date_, dates)));
ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;
} catch (std::exception &e) {
return HandleException("Encounter exception when drop partition", e.what());
}
@ -187,8 +190,8 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
return Status::OK();
}
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Status
SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
try {
server::MetricCollector metric;
@ -223,13 +226,13 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
return utils::CreateTablePath(options_, table_schema.table_id_);
} catch (std::exception &e) {
return HandleException("Encounter exception when create table", e.what());
}
}
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
Status
SqliteMetaImpl::DeleteTable(const std::string &table_id) {
try {
server::MetricCollector metric;
@ -239,15 +242,12 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
//soft delete table
ConnectorPtr->update_all(
set(
c(&TableSchema::state_) = (int) TableSchema::TO_DELETE
),
c(&TableSchema::state_) = (int) TableSchema::TO_DELETE),
where(
c(&TableSchema::table_id_) == table_id and
c(&TableSchema::state_) != (int) TableSchema::TO_DELETE
));
c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table", e.what());
}
@ -255,7 +255,8 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
return Status::OK();
}
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
Status
SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
try {
server::MetricCollector metric;
@ -266,15 +267,12 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
),
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table files", e.what());
}
@ -282,7 +280,8 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
return Status::OK();
}
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
Status
SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
server::MetricCollector metric;
@ -311,7 +310,6 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
} else {
return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
}
} catch (std::exception &e) {
return HandleException("Encounter exception when describe table", e.what());
}
@ -319,7 +317,8 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
return Status::OK();
}
Status SqliteMetaImpl::FilesByType(const std::string& table_id,
Status
SqliteMetaImpl::FilesByType(const std::string &table_id,
const std::vector<int> &file_types,
std::vector<std::string> &file_ids) {
if (file_types.empty()) {
@ -331,8 +330,7 @@ Status SqliteMetaImpl::FilesByType(const std::string& table_id,
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
&TableFileSchema::file_type_),
where(in(&TableFileSchema::file_type_, file_types)
and c(&TableFileSchema::table_id_) == table_id
));
and c(&TableFileSchema::table_id_) == table_id));
if (selected.size() >= 1) {
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
@ -340,29 +338,21 @@ Status SqliteMetaImpl::FilesByType(const std::string& table_id,
for (auto &file : selected) {
file_ids.push_back(std::get<0>(file));
switch (std::get<1>(file)) {
case (int) TableFileSchema::RAW:
raw_count++;
case (int) TableFileSchema::RAW:raw_count++;
break;
case (int) TableFileSchema::NEW:
new_count++;
case (int) TableFileSchema::NEW:new_count++;
break;
case (int) TableFileSchema::NEW_MERGE:
new_merge_count++;
case (int) TableFileSchema::NEW_MERGE:new_merge_count++;
break;
case (int) TableFileSchema::NEW_INDEX:
new_index_count++;
case (int) TableFileSchema::NEW_INDEX:new_index_count++;
break;
case (int) TableFileSchema::TO_INDEX:
to_index_count++;
case (int) TableFileSchema::TO_INDEX:to_index_count++;
break;
case (int) TableFileSchema::INDEX:
index_count++;
case (int) TableFileSchema::INDEX:index_count++;
break;
case (int) TableFileSchema::BACKUP:
backup_count++;
break;
default:
case (int) TableFileSchema::BACKUP:backup_count++;
break;
default:break;
}
}
@ -371,14 +361,14 @@ Status SqliteMetaImpl::FilesByType(const std::string& table_id,
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
<< " index files:" << index_count << " backup files:" << backup_count;
}
} catch (std::exception &e) {
return HandleException("Encounter exception when check non index files", e.what());
}
return Status::OK();
}
Status SqliteMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex& index) {
Status
SqliteMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex &index) {
try {
server::MetricCollector metric;
@ -416,15 +406,12 @@ Status SqliteMetaImpl::UpdateTableIndex(const std::string &table_id, const Table
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
),
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
));
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
} catch (std::exception &e) {
std::string msg = "Encounter exception when update table index: table_id = " + table_id;
return HandleException(msg, e.what());
@ -433,20 +420,18 @@ Status SqliteMetaImpl::UpdateTableIndex(const std::string &table_id, const Table
return Status::OK();
}
Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
Status
SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
server::MetricCollector metric;
//set all backup file to raw
ConnectorPtr->update_all(
set(
c(&TableSchema::flag_) = flag
),
c(&TableSchema::flag_) = flag),
where(
c(&TableSchema::table_id_) == table_id
));
c(&TableSchema::table_id_) == table_id));
ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
} catch (std::exception &e) {
std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
return HandleException(msg, e.what());
@ -455,7 +440,8 @@ Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag
return Status::OK();
}
Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
Status
SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex &index) {
try {
server::MetricCollector metric;
@ -472,7 +458,6 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
} else {
return Status(DB_NOT_FOUND, "Table " + table_id + " not found");
}
} catch (std::exception &e) {
return HandleException("Encounter exception when describe index", e.what());
}
@ -480,7 +465,8 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
return Status::OK();
}
Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
Status
SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
try {
server::MetricCollector metric;
@ -491,37 +477,30 @@ Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
),
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX
));
c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX));
//set all backup file to raw
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
),
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
));
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
//set table index type to raw
ConnectorPtr->update_all(
set(
c(&TableSchema::engine_type_) = DEFAULT_ENGINE_TYPE,
c(&TableSchema::nlist_) = DEFAULT_NLIST,
c(&TableSchema::metric_type_) = DEFAULT_METRIC_TYPE
),
c(&TableSchema::metric_type_) = DEFAULT_METRIC_TYPE),
where(
c(&TableSchema::table_id_) == table_id
));
c(&TableSchema::table_id_) == table_id));
ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table index files", e.what());
}
@ -529,7 +508,8 @@ Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
return Status::OK();
}
Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status
SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not = false;
try {
@ -542,7 +522,6 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
} else {
has_or_not = false;
}
} catch (std::exception &e) {
return HandleException("Encounter exception when lookup table", e.what());
}
@ -550,7 +529,8 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
return Status::OK();
}
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
Status
SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
try {
server::MetricCollector metric;
@ -578,7 +558,6 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
table_schema_array.emplace_back(schema);
}
} catch (std::exception &e) {
return HandleException("Encounter exception when lookup all tables", e.what());
}
@ -586,7 +565,8 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
return Status::OK();
}
Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
Status
SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
if (file_schema.date_ == EmptyDate) {
file_schema.date_ = utils::GetDate();
}
@ -619,7 +599,6 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
return utils::CreateTableFilePath(options_, file_schema);
} catch (std::exception &e) {
return HandleException("Encounter exception when create table file", e.what());
}
@ -627,7 +606,8 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
return Status::OK();
}
Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
Status
SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
@ -685,13 +665,13 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
}
return ret;
} catch (std::exception &e) {
return HandleException("Encounter exception when iterate raw files", e.what());
}
}
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
Status
SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
@ -726,18 +706,15 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
if (partition.empty() && ids.empty()) {
auto filter = where(match_tableid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
}
else if (partition.empty() && !ids.empty()) {
} else if (partition.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto filter = where(match_tableid and match_fileid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
}
else if (!partition.empty() && ids.empty()) {
} else if (!partition.empty() && ids.empty()) {
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_date and match_type);
selected = ConnectorPtr->select(select_columns, filter);
}
else if (!partition.empty() && !ids.empty()) {
} else if (!partition.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_fileid and match_date and match_type);
@ -779,13 +756,13 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
}
return ret;
} catch (std::exception &e) {
return HandleException("Encounter exception when iterate index files", e.what());
}
}
Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
Status
SqliteMetaImpl::FilesToMerge(const std::string &table_id,
DatePartionedTableFilesSchema &files) {
files.clear();
@ -849,13 +826,13 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-merge files";
}
return result;
} catch (std::exception &e) {
return HandleException("Encounter exception when iterate merge files", e.what());
}
}
Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
Status
SqliteMetaImpl::GetTableFiles(const std::string &table_id,
const std::vector<size_t> &ids,
TableFilesSchema &table_files) {
try {
@ -870,8 +847,7 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
&TableFileSchema::created_on_),
where(c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::id_, ids) and
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
TableSchema table_schema;
table_schema.table_id_ = table_id;
@ -910,7 +886,8 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
}
// PXU TODO: Support Swap
Status SqliteMetaImpl::Archive() {
Status
SqliteMetaImpl::Archive() {
auto &criterias = options_.archive_conf_.GetCriterias();
if (criterias.size() == 0) {
return Status::OK();
@ -920,20 +897,18 @@ Status SqliteMetaImpl::Archive() {
auto &criteria = kv.first;
auto &limit = kv.second;
if (criteria == engine::ARCHIVE_CONF_DAYS) {
long usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
int64_t usecs = limit * D_SEC * US_PS;
int64_t now = utils::GetMicroSecTimeStamp();
try {
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
),
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE),
where(
c(&TableFileSchema::created_on_) < (long) (now - usecs) and
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
c(&TableFileSchema::created_on_) < (int64_t) (now - usecs) and
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
} catch (std::exception &e) {
return HandleException("Encounter exception when update table files", e.what());
}
@ -954,20 +929,19 @@ Status SqliteMetaImpl::Archive() {
return Status::OK();
}
Status SqliteMetaImpl::Size(uint64_t &result) {
Status
SqliteMetaImpl::Size(uint64_t &result) {
result = 0;
try {
auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::file_size_)),
where(
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
for (auto &total_size : selected) {
if (!std::get<0>(total_size)) {
continue;
}
result += (uint64_t) (*std::get<0>(total_size));
}
} catch (std::exception &e) {
return HandleException("Encounter exception when calculte db size", e.what());
}
@ -975,7 +949,8 @@ Status SqliteMetaImpl::Size(uint64_t &result) {
return Status::OK();
}
Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
Status
SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
if (to_discard_size <= 0) {
return Status::OK();
}
@ -1016,11 +991,9 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
),
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
in(&TableFileSchema::id_, ids)
));
in(&TableFileSchema::id_, ids)));
return true;
});
@ -1028,7 +1001,6 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
if (!commited) {
return HandleException("DiscardFiles error: sqlite transaction failed");
}
} catch (std::exception &e) {
return HandleException("Encounter exception when discard table file", e.what());
}
@ -1036,7 +1008,8 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
return DiscardFiles(to_discard_size);
}
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status
SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
server::MetricCollector metric;
@ -1056,7 +1029,6 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
ConnectorPtr->update(file_schema);
ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
} catch (std::exception &e) {
std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
+ " file_id = " + file_schema.file_id_;
@ -1065,7 +1037,8 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
return Status::OK();
}
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
Status
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
try {
server::MetricCollector metric;
@ -1074,15 +1047,12 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
),
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX),
where(
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
));
c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW));
ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when update table files to to_index", e.what());
}
@ -1090,7 +1060,8 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
return Status::OK();
}
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
Status
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
server::MetricCollector metric;
@ -1135,7 +1106,8 @@ Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
return Status::OK();
}
Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
std::set<std::string> table_ids;
@ -1181,7 +1153,6 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
if (files.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when clean table files", e.what());
}
@ -1213,7 +1184,6 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
if (tables.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when clean table files", e.what());
}
@ -1234,7 +1204,6 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
if (table_ids.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table folder", e.what());
}
@ -1242,7 +1211,8 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
return Status::OK();
}
Status SqliteMetaImpl::CleanUp() {
Status
SqliteMetaImpl::CleanUp() {
try {
server::MetricCollector metric;
@ -1254,7 +1224,8 @@ Status SqliteMetaImpl::CleanUp() {
(int) TableFileSchema::NEW_INDEX,
(int) TableFileSchema::NEW_MERGE
};
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
auto files =
ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
auto commited = ConnectorPtr->transaction([&]() mutable {
for (auto &file : files) {
@ -1271,7 +1242,6 @@ Status SqliteMetaImpl::CleanUp() {
if (files.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when clean table file", e.what());
}
@ -1279,8 +1249,8 @@ Status SqliteMetaImpl::CleanUp() {
return Status::OK();
}
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
Status
SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
server::MetricCollector metric;
@ -1305,14 +1275,14 @@ Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
for (auto &file : selected) {
result += std::get<0>(file);
}
} catch (std::exception &e) {
return HandleException("Encounter exception when calculate table file size", e.what());
}
return Status::OK();
}
Status SqliteMetaImpl::DropAll() {
Status
SqliteMetaImpl::DropAll() {
ENGINE_LOG_DEBUG << "Drop all sqlite meta";
try {

View File

@ -21,22 +21,25 @@
#include "db/Options.h"
#include <mutex>
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
auto StoragePrototype(const std::string &path);
auto
StoragePrototype(const std::string &path);
class SqliteMetaImpl : public Meta {
public:
explicit SqliteMetaImpl(const DBMetaOptions &options_);
explicit SqliteMetaImpl(const DBMetaOptions &options);
~SqliteMetaImpl();
Status CreateTable(TableSchema &table_schema) override;
Status DescribeTable(TableSchema &group_info_) override;
Status DescribeTable(TableSchema &table_schema) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override;
@ -96,7 +99,7 @@ class SqliteMetaImpl : public Meta {
private:
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
Status DiscardFiles(long to_discard_size);
Status DiscardFiles(int64_t to_discard_size);
void ValidateMetaSchema();
Status Initialize();

View File

@ -16,7 +16,11 @@
// under the License.
#include "Algorithm.h"
#include "scheduler/Algorithm.h"
#include <limits>
#include <unordered_map>
#include <utility>
namespace zilliz {
namespace milvus {
@ -29,7 +33,6 @@ ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string> &path) {
std::vector<std::vector<std::string>> paths;
uint64_t num_of_resources = res_mgr->GetAllResources().size();
@ -53,7 +56,6 @@ ShortestPath(const ResourcePtr &src,
std::vector<bool> vis(num_of_resources, false);
std::vector<uint64_t> dis(num_of_resources, MAXINT);
for (auto &res : res_mgr->GetAllResources()) {
auto cur_node = std::static_pointer_cast<Node>(res);
auto cur_neighbours = cur_node->GetNeighbours();
@ -105,6 +107,6 @@ ShortestPath(const ResourcePtr &src,
return dis[name_id_map.at(dest->name())];
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -32,6 +32,6 @@ ShortestPath(const ResourcePtr &src,
const ResourceMgrPtr &res_mgr,
std::vector<std::string> &path);
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -30,7 +30,6 @@
#include "db/engine/EngineFactory.h"
#include "db/engine/ExecutionEngine.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -43,6 +42,6 @@ using EngineFactory = engine::EngineFactory;
using EngineType = engine::EngineType;
using MetricType = engine::MetricType;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,19 +15,19 @@
// specific language governing permissions and limitations
// under the License.
#include "JobMgr.h"
#include "scheduler/JobMgr.h"
#include "task/Task.h"
#include "TaskCreator.h"
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
using namespace engine;
JobMgr::JobMgr(ResourceMgrPtr res_mgr)
: res_mgr_(std::move(res_mgr)) {}
: res_mgr_(std::move(res_mgr)) {
}
void
JobMgr::Start() {
@ -59,7 +59,9 @@ void
JobMgr::worker_function() {
while (running_) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !queue_.empty(); });
cv_.wait(lock, [this] {
return !queue_.empty();
});
auto job = queue_.front();
queue_.pop();
lock.unlock();
@ -84,6 +86,6 @@ JobMgr::build_task(const JobPtr &job) {
return TaskCreator::Create(job);
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -31,15 +31,13 @@
#include "task/Task.h"
#include "ResourceMgr.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class JobMgr {
public:
explicit
JobMgr(ResourceMgrPtr res_mgr);
explicit JobMgr(ResourceMgrPtr res_mgr);
void
Start();
@ -72,6 +70,6 @@ private:
using JobMgrPtr = std::shared_ptr<JobMgr>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,8 +16,7 @@
// under the License.
#include "ResourceFactory.h"
#include "scheduler/ResourceFactory.h"
namespace zilliz {
namespace milvus {
@ -40,6 +39,6 @@ ResourceFactory::Create(const std::string &name,
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -25,7 +25,6 @@
#include "resource/GpuResource.h"
#include "resource/DiskResource.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -40,8 +39,6 @@ public:
bool enable_executor = true);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,15 +16,13 @@
// specific language governing permissions and limitations
// under the License.
#include "ResourceMgr.h"
#include "scheduler/ResourceMgr.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
void
ResourceMgr::Start() {
std::lock_guard<std::mutex> lck(resources_mutex_);
@ -186,7 +184,9 @@ void
ResourceMgr::event_process() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
event_cv_.wait(lock, [this] {
return !queue_.empty();
});
auto event = queue_.front();
queue_.pop();
@ -201,6 +201,6 @@ ResourceMgr::event_process() {
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -22,12 +22,12 @@
#include <memory>
#include <mutex>
#include <queue>
#include <utility>
#include <condition_variable>
#include "resource/Resource.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -120,13 +120,11 @@ private:
std::condition_variable event_cv_;
std::thread worker_thread_;
};
using ResourceMgrPtr = std::shared_ptr<ResourceMgr>;
using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,12 +16,16 @@
// under the License.
#include "SchedInst.h"
#include "scheduler/SchedInst.h"
#include "server/Config.h"
#include "ResourceFactory.h"
#include "knowhere/index/vector_index/IndexGPUIVF.h"
#include "Utils.h"
#include <vector>
#include <set>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -165,6 +169,7 @@ StopSchedulerService() {
SchedInst::GetInstance()->Stop();
ResMgrInst::GetInstance()->Stop();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -24,7 +24,6 @@
#include <mutex>
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -89,6 +88,6 @@ StartSchedulerService();
void
StopSchedulerService();
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
#include "src/cache/GpuCacheMgr.h"
#include "scheduler/Scheduler.h"
#include "cache/GpuCacheMgr.h"
#include "event/LoadCompletedEvent.h"
#include "Scheduler.h"
#include "action/Action.h"
#include "Algorithm.h"
#include <utility>
namespace zilliz {
namespace milvus {
@ -43,7 +43,6 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1)));
}
void
Scheduler::Start() {
running_ = true;
@ -79,7 +78,9 @@ void
Scheduler::worker_function() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
event_cv_.wait(lock, [this] {
return !event_queue_.empty();
});
auto event = event_queue_.front();
event_queue_.pop();
if (event == nullptr) {
@ -142,6 +143,6 @@ Scheduler::OnTaskTableUpdated(const EventPtr &event) {
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -22,22 +22,20 @@
#include <mutex>
#include <thread>
#include <queue>
#include <unordered_map>
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
// TODO: refactor, not friendly to unittest, logical in framework code
class Scheduler {
public:
explicit
Scheduler(ResourceMgrWPtr res_mgr);
explicit Scheduler(ResourceMgrWPtr res_mgr);
Scheduler(const Scheduler &) = delete;
Scheduler(Scheduler &&) = delete;
@ -133,7 +131,6 @@ private:
using SchedulerPtr = std::shared_ptr<Scheduler>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
#include <src/scheduler/tasklabel/BroadcastLabel.h>
#include "TaskCreator.h"
#include "scheduler/TaskCreator.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -64,8 +63,6 @@ TaskCreator::Create(const DeleteJobPtr &job) {
return tasks;
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -34,7 +34,6 @@
#include "task/SearchTask.h"
#include "task/DeleteTask.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -52,6 +51,6 @@ public:
Create(const DeleteJobPtr &job);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,7 +16,7 @@
// under the License.
#include "TaskTable.h"
#include "scheduler/TaskTable.h"
#include "event/TaskTableUpdatedEvent.h"
#include "Utils.h"
@ -24,7 +24,6 @@
#include <sstream>
#include <ctime>
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -75,6 +74,7 @@ TaskTableItem::Load() {
}
return false;
}
bool
TaskTableItem::Loaded() {
std::unique_lock<std::mutex> lock(mutex);
@ -86,6 +86,7 @@ TaskTableItem::Loaded() {
}
return false;
}
bool
TaskTableItem::Execute() {
std::unique_lock<std::mutex> lock(mutex);
@ -97,6 +98,7 @@ TaskTableItem::Execute() {
}
return false;
}
bool
TaskTableItem::Executed() {
std::unique_lock<std::mutex> lock(mutex);
@ -109,6 +111,7 @@ TaskTableItem::Executed() {
}
return false;
}
bool
TaskTableItem::Move() {
std::unique_lock<std::mutex> lock(mutex);
@ -120,6 +123,7 @@ TaskTableItem::Move() {
}
return false;
}
bool
TaskTableItem::Moved() {
std::unique_lock<std::mutex> lock(mutex);
@ -206,7 +210,6 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) {
}
}
TaskTableItemPtr
TaskTable::Get(uint64_t index) {
return table_[index];
@ -232,6 +235,6 @@ TaskTable::Dump() {
return ss.str();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -20,11 +20,13 @@
#include <vector>
#include <deque>
#include <mutex>
#include <memory>
#include <utility>
#include <string>
#include "task/SearchTask.h"
#include "event/Event.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -52,7 +54,8 @@ struct TaskTimestamp {
};
struct TaskTableItem {
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {}
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
}
TaskTableItem(const TaskTableItem &src) = delete;
TaskTableItem(TaskTableItem &&) = delete;
@ -151,8 +154,13 @@ public:
return table_[index];
}
std::deque<TaskTableItemPtr>::iterator begin() { return table_.begin(); }
std::deque<TaskTableItemPtr>::iterator end() { return table_.end(); }
std::deque<TaskTableItemPtr>::iterator begin() {
return table_.begin();
}
std::deque<TaskTableItemPtr>::iterator end() {
return table_.end();
}
public:
std::vector<uint64_t>
@ -162,7 +170,6 @@ public:
PickToExecute(uint64_t limit);
public:
/******** Action ********/
// TODO: bool to Status
@ -246,7 +253,6 @@ private:
uint64_t last_finish_ = -1;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,12 +16,11 @@
// under the License.
#include "Utils.h"
#include "scheduler/Utils.h"
#include <chrono>
#include <cuda_runtime.h>
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -41,6 +40,6 @@ get_num_gpu() {
return n_devices;
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -18,7 +18,6 @@
#include <cstdint>
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -29,6 +28,6 @@ get_current_timestamp();
uint64_t
get_num_gpu();
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -17,9 +17,10 @@
#pragma once
#include "../resource/Resource.h"
#include "../ResourceMgr.h"
#include "scheduler/resource/Resource.h"
#include "scheduler/ResourceMgr.h"
#include <memory>
namespace zilliz {
namespace milvus {
@ -43,10 +44,8 @@ public:
SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr,
ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -22,7 +22,6 @@
#include "src/cache/GpuCacheMgr.h"
#include "Action.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -57,7 +56,6 @@ get_neighbours_with_connetion(const ResourcePtr &self) {
return neighbours;
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
const ResourcePtr &self) {
@ -87,7 +85,6 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
} else {
//TODO: process
}
}
void
@ -181,7 +178,6 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr,
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -18,6 +18,8 @@
#pragma once
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -37,7 +39,8 @@ public:
explicit
Event(EventType type, std::weak_ptr<Resource> resource)
: type_(type),
resource_(std::move(resource)) {}
resource_(std::move(resource)) {
}
inline EventType
Type() const {
@ -56,6 +59,6 @@ public:
using EventPtr = std::shared_ptr<Event>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -22,36 +22,40 @@
#include "FinishTaskEvent.h"
#include "TaskTableUpdatedEvent.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const Event &event) {
std::ostream &
operator<<(std::ostream &out, const Event &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) {
std::ostream &
operator<<(std::ostream &out, const StartUpEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const LoadCompletedEvent &event) {
std::ostream &
operator<<(std::ostream &out, const LoadCompletedEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) {
std::ostream &
operator<<(std::ostream &out, const FinishTaskEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) {
std::ostream &
operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) {
out << event.Dump();
return out;
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -17,8 +17,11 @@
#pragma once
#include "Event.h"
#include "scheduler/event/Event.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -28,7 +31,8 @@ class FinishTaskEvent : public Event {
public:
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::FINISH_TASK, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
task_table_item_(std::move(task_table_item)) {
}
inline std::string
Dump() const override {
@ -41,6 +45,6 @@ public:
TaskTableItemPtr task_table_item_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -17,9 +17,12 @@
#pragma once
#include "Event.h"
#include "../TaskTable.h"
#include "scheduler/event/Event.h"
#include "scheduler/TaskTable.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -29,7 +32,8 @@ class LoadCompletedEvent : public Event {
public:
LoadCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::LOAD_COMPLETED, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
task_table_item_(std::move(task_table_item)) {
}
inline std::string
Dump() const override {
@ -42,6 +46,6 @@ public:
TaskTableItemPtr task_table_item_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -17,8 +17,11 @@
#pragma once
#include "Event.h"
#include "scheduler/event/Event.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -26,9 +29,9 @@ namespace scheduler {
class StartUpEvent : public Event {
public:
explicit
StartUpEvent(std::weak_ptr<Resource> resource)
: Event(EventType::START_UP, std::move(resource)) {}
explicit StartUpEvent(std::weak_ptr<Resource> resource)
: Event(EventType::START_UP, std::move(resource)) {
}
inline std::string
Dump() const override {
@ -38,6 +41,6 @@ public:
friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -19,6 +19,9 @@
#include "Event.h"
#include <memory>
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -26,9 +29,9 @@ namespace scheduler {
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {}
explicit TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {
}
inline std::string
Dump() const override {
@ -38,7 +41,6 @@ public:
friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event);
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
#include "DeleteJob.h"
#include "scheduler/job/DeleteJob.h"
#include <utility>
namespace zilliz {
namespace milvus {
@ -29,15 +30,20 @@ DeleteJob::DeleteJob(JobId id,
: Job(id, JobType::DELETE),
table_id_(std::move(table_id)),
meta_ptr_(std::move(meta_ptr)),
num_resource_(num_resource) {}
num_resource_(num_resource) {
}
void DeleteJob::WaitAndDelete() {
void
DeleteJob::WaitAndDelete() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_resource == num_resource_; });
cv_.wait(lock, [&] {
return done_resource == num_resource_;
});
meta_ptr_->DeleteTableFiles(table_id_);
}
void DeleteJob::ResourceDone() {
void
DeleteJob::ResourceDone() {
{
std::lock_guard<std::mutex> lock(mutex_);
++done_resource;
@ -45,7 +51,6 @@ void DeleteJob::ResourceDone() {
cv_.notify_one();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -30,7 +30,6 @@
#include "Job.h"
#include "db/meta/Meta.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -72,7 +71,6 @@ private:
using DeleteJobPtr = std::shared_ptr<DeleteJob>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -27,7 +27,6 @@
#include <condition_variable>
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -54,7 +53,8 @@ public:
}
protected:
Job(JobId id, JobType type) : id_(id), type_(type) {}
Job(JobId id, JobType type) : id_(id), type_(type) {
}
private:
JobId id_;
@ -64,7 +64,6 @@ private:
using JobPtr = std::shared_ptr<Job>;
using JobWPtr = std::weak_ptr<Job>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,11 +15,9 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/job/SearchJob.h"
#include "utils/Log.h"
#include "SearchJob.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -33,7 +31,8 @@ SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id,
topk_(topk),
nq_(nq),
nprobe_(nprobe),
vectors_(vectors) {}
vectors_(vectors) {
}
bool
SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) {
@ -48,11 +47,12 @@ SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) {
return true;
}
void
SearchJob::WaitResult() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return index_files_.empty(); });
cv_.wait(lock, [this] {
return index_files_.empty();
});
SERVER_LOG_DEBUG << "SearchJob " << id() << " all done";
}
@ -74,9 +74,6 @@ SearchJob::GetStatus() {
return status_;
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -26,16 +26,15 @@
#include <mutex>
#include <condition_variable>
#include <memory>
#include <utility>
#include "Job.h"
#include "db/meta/MetaTypes.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
using engine::meta::TableFileSchemaPtr;
using Id2IndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
@ -77,6 +76,7 @@ public:
nprobe() const {
return nprobe_;
}
const float *
vectors() const {
return vectors_;
@ -105,7 +105,6 @@ private:
using SearchJobPtr = std::shared_ptr<SearchJob>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -19,7 +19,7 @@
#include <string>
#include <sstream>
#include <utility>
namespace zilliz {
namespace milvus {
@ -29,7 +29,8 @@ class Connection {
public:
// TODO: update construct function, speed: double->uint64_t
Connection(std::string name, double speed)
: name_(std::move(name)), speed_(speed) {}
: name_(std::move(name)), speed_(speed) {
}
const std::string &
name() const {
@ -59,7 +60,6 @@ private:
uint64_t speed_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,29 +16,34 @@
// under the License.
#include "CpuResource.h"
#include "scheduler/resource/CpuResource.h"
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
std::ostream &
operator<<(std::ostream &out, const CpuResource &resource) {
out << resource.Dump();
return out;
}
CpuResource::CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {}
: Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {
}
void CpuResource::LoadFile(TaskPtr task) {
void
CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0);
}
void CpuResource::Process(TaskPtr task) {
void
CpuResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -21,7 +21,6 @@
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -46,6 +45,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,14 +15,17 @@
// specific language governing permissions and limitations
// under the License.
#include "DiskResource.h"
#include "scheduler/resource/DiskResource.h"
#include <string>
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
std::ostream &
operator<<(std::ostream &out, const DiskResource &resource) {
out << resource.Dump();
return out;
}
@ -31,15 +34,14 @@ DiskResource::DiskResource(std::string name, uint64_t device_id, bool enable_loa
: Resource(std::move(name), ResourceType::DISK, device_id, enable_loader, enable_executor) {
}
void DiskResource::LoadFile(TaskPtr task) {
void
DiskResource::LoadFile(TaskPtr task) {
}
void DiskResource::Process(TaskPtr task) {
}
}
}
void
DiskResource::Process(TaskPtr task) {
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -17,9 +17,9 @@
#pragma once
#include "Resource.h"
#include <string>
namespace zilliz {
namespace milvus {
@ -45,6 +45,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,29 +16,32 @@
// under the License.
#include "GpuResource.h"
#include "scheduler/resource/GpuResource.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
std::ostream &
operator<<(std::ostream &out, const GpuResource &resource) {
out << resource.Dump();
return out;
}
GpuResource::GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {}
: Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {
}
void GpuResource::LoadFile(TaskPtr task) {
void
GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, device_id_);
}
void GpuResource::Process(TaskPtr task) {
void
GpuResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -17,9 +17,10 @@
#pragma once
#include "Resource.h"
#include <string>
#include <utility>
namespace zilliz {
namespace milvus {
@ -45,6 +46,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/resource/Node.h"
#include <atomic>
#include "Node.h"
#include <utility>
namespace zilliz {
namespace milvus {
@ -29,7 +29,8 @@ Node::Node() {
id_ = counter++;
}
std::vector<Neighbour> Node::GetNeighbours() {
std::vector<Neighbour>
Node::GetNeighbours() {
std::lock_guard<std::mutex> lk(mutex_);
std::vector<Neighbour> ret;
for (auto &e : neighbours_) {
@ -38,7 +39,8 @@ std::vector<Neighbour> Node::GetNeighbours() {
return ret;
}
std::string Node::Dump() {
std::string
Node::Dump() {
std::stringstream ss;
ss << "<Node, id=" << std::to_string(id_) << ">::neighbours:" << std::endl;
for (auto &neighbour : neighbours_) {
@ -48,7 +50,8 @@ std::string Node::Dump() {
return ss.str();
}
void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
void
Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_node.lock()) {
neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection)));
@ -56,6 +59,6 @@ void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &conn
// else do nothing, consider it..
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -20,11 +20,11 @@
#include <vector>
#include <memory>
#include <map>
#include <string>
#include "../TaskTable.h"
#include "scheduler/TaskTable.h"
#include "Connection.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -35,7 +35,8 @@ using NeighbourNodePtr = std::weak_ptr<Node>;
struct Neighbour {
Neighbour(NeighbourNodePtr nei, Connection conn)
: neighbour_node(nei), connection(conn) {}
: neighbour_node(nei), connection(conn) {
}
NeighbourNodePtr neighbour_node;
Connection connection;
@ -65,6 +66,6 @@ private:
using NodePtr = std::shared_ptr<Node>;
using NodeWPtr = std::weak_ptr<Node>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
#include <iostream>
#include "../Utils.h"
#include "Resource.h"
#include "scheduler/resource/Resource.h"
#include "scheduler/Utils.h"
#include <iostream>
#include <utility>
namespace zilliz {
namespace milvus {
@ -100,7 +101,8 @@ Resource::NumOfTaskToExec() {
return count;
}
TaskTableItemPtr Resource::pick_task_load() {
TaskTableItemPtr
Resource::pick_task_load() {
auto indexes = task_table_.PickToLoad(10);
for (auto index : indexes) {
// try to set one task loading, then return
@ -111,7 +113,8 @@ TaskTableItemPtr Resource::pick_task_load() {
return nullptr;
}
TaskTableItemPtr Resource::pick_task_execute() {
TaskTableItemPtr
Resource::pick_task_execute() {
auto indexes = task_table_.PickToExecute(3);
for (auto index : indexes) {
// try to set one task executing, then return
@ -122,10 +125,13 @@ TaskTableItemPtr Resource::pick_task_execute() {
return nullptr;
}
void Resource::loader_function() {
void
Resource::loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
load_cv_.wait(lock, [&] {
return load_flag_;
});
load_flag_ = false;
lock.unlock();
while (true) {
@ -140,18 +146,20 @@ void Resource::loader_function() {
subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
void Resource::executor_function() {
void
Resource::executor_function() {
if (subscriber_) {
auto event = std::make_shared<StartUpEvent>(shared_from_this());
subscriber_(std::static_pointer_cast<Event>(event));
}
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
exec_cv_.wait(lock, [&] {
return exec_flag_;
});
exec_flag_ = false;
lock.unlock();
while (true) {
@ -172,10 +180,9 @@ void Resource::executor_function() {
subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -20,6 +20,7 @@
#include <string>
#include <vector>
#include <memory>
#include <utility>
#include <thread>
#include <functional>
#include <condition_variable>
@ -34,7 +35,6 @@
#include "Connection.h"
#include "Node.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -212,7 +212,6 @@ public:
using ResourcePtr = std::shared_ptr<Resource>;
using ResourceWPtr = std::weak_ptr<Resource>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.
#include "TestResource.h"
#include "scheduler/resource/TestResource.h"
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
std::ostream &operator<<(std::ostream &out, const TestResource &resource) {
std::ostream &
operator<<(std::ostream &out, const TestResource &resource) {
out << resource.Dump();
return out;
}
@ -31,15 +33,16 @@ TestResource::TestResource(std::string name, uint64_t device_id, bool enable_loa
: Resource(std::move(name), ResourceType::TEST, device_id, enable_loader, enable_executor) {
}
void TestResource::LoadFile(TaskPtr task) {
void
TestResource::LoadFile(TaskPtr task) {
task->Load(LoadType::TEST, 0);
}
void TestResource::Process(TaskPtr task) {
void
TestResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -17,9 +17,10 @@
#pragma once
#include "Resource.h"
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -45,6 +46,6 @@ protected:
Process(TaskPtr task) override;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -16,19 +16,18 @@
// under the License.
#include "DeleteTask.h"
#include "scheduler/task/DeleteTask.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
XDeleteTask::XDeleteTask(const scheduler::DeleteJobPtr &delete_job)
: Task(TaskType::DeleteTask), delete_job_(delete_job) {}
: Task(TaskType::DeleteTask), delete_job_(delete_job) {
}
void
XDeleteTask::Load(LoadType type, uint8_t device_id) {
}
void
@ -36,6 +35,6 @@ XDeleteTask::Execute() {
delete_job_->ResourceDone();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -20,15 +20,13 @@
#include "scheduler/job/DeleteJob.h"
#include "Task.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class XDeleteTask : public Task {
public:
explicit
XDeleteTask(const scheduler::DeleteJobPtr &job);
explicit XDeleteTask(const scheduler::DeleteJobPtr &job);
void
Load(LoadType type, uint8_t device_id) override;
@ -40,6 +38,6 @@ public:
scheduler::DeleteJobPtr delete_job_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -20,7 +20,6 @@
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -29,7 +28,8 @@ class Path {
public:
Path() = default;
Path(std::vector<std::string>& path, uint64_t index) : path_(path), index_(index) {}
Path(std::vector<std::string> &path, uint64_t index) : path_(path), index_(index) {
}
void
push_back(const std::string &str) {
@ -49,7 +49,6 @@ class Path {
} else {
return nullptr;
}
}
std::string
@ -67,14 +66,19 @@ class Path {
return path_[index];
}
std::vector<std::string>::iterator begin() { return path_.begin(); }
std::vector<std::string>::iterator end() { return path_.end(); }
std::vector<std::string>::iterator begin() {
return path_.begin();
}
std::vector<std::string>::iterator end() {
return path_.end();
}
public:
std::vector<std::string> path_;
uint64_t index_ = 0;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,15 +15,16 @@
// specific language governing permissions and limitations
// under the License.
#include "SearchTask.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/job/SearchJob.h"
#include "metrics/Metrics.h"
#include "db/engine/EngineFactory.h"
#include "utils/TimeRecorder.h"
#include "utils/Log.h"
#include <thread>
#include "scheduler/job/SearchJob.h"
#include <utility>
#include <string>
namespace zilliz {
namespace milvus {
@ -104,7 +105,6 @@ XSearchTask::XSearchTask(TableFileSchemaPtr file)
(MetricType) file_->metric_type_,
file_->nlist_);
}
}
void
@ -183,7 +183,7 @@ XSearchTask::Execute() {
server::CollectDurationMetrics metrics(index_type_);
std::vector<long> output_ids;
std::vector<int64_t> output_ids;
std::vector<float> output_distance;
if (auto job = job_.lock()) {
@ -236,7 +236,8 @@ XSearchTask::Execute() {
index_engine_ = nullptr;
}
Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
Status
XSearchTask::ClusterResult(const std::vector<int64_t> &output_ids,
const std::vector<float> &output_distance,
uint64_t nq,
uint64_t topk,
@ -275,7 +276,8 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
return Status::OK();
}
Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src,
Status
XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src,
scheduler::Id2DistanceMap &distance_target,
uint64_t topk,
bool ascending) {
@ -349,7 +351,8 @@ Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src,
return Status::OK();
}
Status XSearchTask::TopkResult(scheduler::ResultSet &result_src,
Status
XSearchTask::TopkResult(scheduler::ResultSet &result_src,
uint64_t topk,
bool ascending,
scheduler::ResultSet &result_target) {
@ -381,7 +384,6 @@ Status XSearchTask::TopkResult(scheduler::ResultSet &result_src,
return Status::OK();
}
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -21,6 +21,7 @@
#include "scheduler/job/SearchJob.h"
#include "scheduler/Definition.h"
#include <vector>
namespace zilliz {
namespace milvus {
@ -29,8 +30,7 @@ namespace scheduler {
// TODO: rewrite
class XSearchTask : public Task {
public:
explicit
XSearchTask(TableFileSchemaPtr file);
explicit XSearchTask(TableFileSchemaPtr file);
void
Load(LoadType type, uint8_t device_id) override;
@ -39,7 +39,7 @@ public:
Execute() override;
public:
static Status ClusterResult(const std::vector<long> &output_ids,
static Status ClusterResult(const std::vector<int64_t> &output_ids,
const std::vector<float> &output_distence,
uint64_t nq,
uint64_t topk,
@ -66,6 +66,6 @@ public:
static std::mutex merge_mutex_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -25,7 +25,6 @@
#include <string>
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
@ -50,14 +49,16 @@ using TaskPtr = std::shared_ptr<Task>;
// TODO: re-design
class Task {
public:
explicit
Task(TaskType type) : type_(type) {}
explicit Task(TaskType type) : type_(type) {
}
/*
* Just Getter;
*/
inline TaskType
Type() const { return type_; }
Type() const {
return type_;
}
/*
* Transport path;
@ -90,7 +91,6 @@ public:
TaskLabelPtr label_ = nullptr;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -15,16 +15,15 @@
// specific language governing permissions and limitations
// under the License.
#include <src/cache/GpuCacheMgr.h>
#include "TestTask.h"
#include "scheduler/task/TestTask.h"
#include "cache/GpuCacheMgr.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {}
TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {
}
void
TestTask::Load(LoadType type, uint8_t device_id) {
@ -44,10 +43,11 @@ TestTask::Execute() {
void
TestTask::Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_; });
}
}
}
cv_.wait(lock, [&] {
return done_;
});
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -19,15 +19,13 @@
#include "SearchTask.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class TestTask : public XSearchTask {
public:
explicit
TestTask(TableFileSchemaPtr& file);
explicit TestTask(TableFileSchemaPtr &file);
public:
void
@ -48,7 +46,6 @@ public:
std::condition_variable cv_;
};
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -21,19 +21,19 @@
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
class BroadcastLabel : public TaskLabel {
public:
BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) {}
BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) {
}
};
using BroadcastLabelPtr = std::shared_ptr<BroadcastLabel>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -21,20 +21,18 @@
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
class DefaultLabel : public TaskLabel {
public:
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {}
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {
}
};
using DefaultLabelPtr = std::shared_ptr<DefaultLabel>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -22,7 +22,6 @@
#include <string>
#include <memory>
class Resource;
using ResourceWPtr = std::weak_ptr<Resource>;
@ -33,8 +32,9 @@ namespace scheduler {
class SpecResLabel : public TaskLabel {
public:
SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {}
explicit SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {
}
inline ResourceWPtr &
resource() {
@ -53,7 +53,6 @@ private:
using SpecResLabelPtr = std::shared_ptr<SpecResLabel>();
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz

View File

@ -37,8 +37,8 @@ public:
}
protected:
explicit
TaskLabel(TaskLabelType type) : type_(type) {}
explicit TaskLabel(TaskLabelType type) : type_(type) {
}
private:
TaskLabelType type_;
@ -46,7 +46,6 @@ private:
using TaskLabelPtr = std::shared_ptr<TaskLabel>;
}
}
}
} // namespace scheduler
} // namespace milvus
} // namespace zilliz