Allow system conf modifiable and some take effect directly #1263 (#1287)

* finish configure modifiable and some take effect runtime

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* move test file to unittet folder

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add test case for cofig

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* fix format issue

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* optimize callback

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* fix error when get config by http in CPU mode

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* update callback map

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* correct typo

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* update callback register

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add identity in registers

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add identity in FaissPass class

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add comments at cmd test if fail

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* remove some comments and add test for validate storage path

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* format pass

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* remove endpoint info and update CORS

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add test case in web module to test drop a non-existent table

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* optimize config store

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add check code in config

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add scheduler gpu handler

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* update optimizer faiss pass

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add default value of gpu_enable_ in GpuCacheMgr

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* optimize code: remove comments

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* format pass

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* add gpu macro in optimizer handler module to fix compile bug on cpu version

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* change cache setting unit  bytes

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* rename variables

Signed-off-by: Yhz <yinghao.zou@zilliz.com>
pull/1389/head^2
BossZou 2020-02-26 18:54:32 +08:00 committed by GitHub
parent a55cb9fe7e
commit c63a50c635
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1462 additions and 399 deletions

View File

@ -54,6 +54,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1078 - Move 'insert_buffer_size' to Cache Config section
- \#1105 - Error message is not clear when creating IVFSQ8H index without gpu resources
- \#740, #849, #878, #972, #1033, #1161, #1173, #1199, #1190, #1223, #1222, #1257, #1264, #1269, #1164, #1303, #1304, #1324 - Various fixes and improvements for Milvus documentation.
- \#1263 - Allow system conf modifiable and some take effect directly
- \#1320 - Remove debug logging from faiss
## Task

1
core/.gitignore vendored
View File

@ -1,5 +1,6 @@
milvus/
conf/server_config.yaml
conf/server_config.yaml.ori
conf/log_config.conf
src/config.h
src/version.h

View File

@ -49,6 +49,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_file
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer scheduler_optimizer_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer/handler scheduler_optimizer_handler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_files)
set(scheduler_files
@ -57,6 +58,7 @@ set(scheduler_files
${scheduler_event_files}
${scheduler_job_files}
${scheduler_optimizer_files}
${scheduler_optimizer_handler_files}
${scheduler_resource_files}
${scheduler_task_files}
)

View File

@ -32,6 +32,15 @@ GpuCacheMgr::GpuCacheMgr() {
// All config values have been checked in Config::ValidateConfig()
server::Config& config = server::Config::GetInstance();
config.GenUniqueIdentityID("GpuCacheMar", identity_);
config.GetGpuResourceConfigEnable(gpu_enable_);
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
auto& config = server::Config::GetInstance();
return config.GetGpuResourceConfigEnable(this->gpu_enable_);
};
config.RegisterCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_, lambda);
int64_t gpu_cache_cap;
config.GetGpuResourceConfigCacheCapacity(gpu_cache_cap);
int64_t cap = gpu_cache_cap * G_BYTE;
@ -42,6 +51,11 @@ GpuCacheMgr::GpuCacheMgr() {
cache_->set_freemem_percent(gpu_mem_threshold);
}
GpuCacheMgr::~GpuCacheMgr() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_);
}
GpuCacheMgr*
GpuCacheMgr::GetInstance(uint64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
@ -61,6 +75,14 @@ GpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
return obj;
}
void
GpuCacheMgr::InsertItem(const std::string& key, const milvus::cache::DataObjPtr& data) {
if (gpu_enable_) {
CacheMgr<DataObjPtr>::InsertItem(key, data);
}
}
#endif
} // namespace cache

View File

@ -27,13 +27,20 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr> {
public:
GpuCacheMgr();
~GpuCacheMgr();
static GpuCacheMgr*
GetInstance(uint64_t gpu_id);
DataObjPtr
GetIndex(const std::string& key);
void
InsertItem(const std::string& key, const DataObjPtr& data);
private:
bool gpu_enable_ = true;
std::string identity_;
static std::mutex mutex_;
static std::unordered_map<uint64_t, GpuCacheMgrPtr> instance_;
};

View File

@ -100,7 +100,7 @@ DBImpl::Stop() {
}
initialized_.store(false, std::memory_order_release);
// makesure all memory data serialized
// make sure all memory data serialized
std::set<std::string> sync_table_ids;
SyncMemData(sync_table_ids);

View File

@ -14,6 +14,7 @@
#include "MemManager.h"
#include "MemTable.h"
#include "db/meta/Meta.h"
#include "server/Config.h"
#include "utils/Status.h"
#include <ctime>
@ -32,6 +33,26 @@ class MemManagerImpl : public MemManager {
using Ptr = std::shared_ptr<MemManagerImpl>;
MemManagerImpl(const meta::MetaPtr& meta, const DBOptions& options) : meta_(meta), options_(options) {
server::Config& config = server::Config::GetInstance();
config.GenUniqueIdentityID("MemManagerImpl", identity_);
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t buffer_size;
auto status = config.GetCacheConfigInsertBufferSize(buffer_size);
if (status.ok()) {
options_.insert_buffer_size_ = buffer_size * ONE_GB;
}
return status;
};
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_, lambda);
}
~MemManagerImpl() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_);
}
Status
@ -63,6 +84,7 @@ class MemManagerImpl : public MemManager {
using MemIdMap = std::map<std::string, MemTablePtr>;
using MemList = std::vector<MemTablePtr>;
std::string identity_;
MemIdMap mem_id_map_;
MemList immu_mem_list_;
meta::MetaPtr meta_;

View File

@ -21,30 +21,44 @@ namespace scheduler {
void
BuildIndexPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetGpuResourceConfigBuildIndexResources(build_gpu_ids_);
Status s = config.GetGpuResourceConfigBuildIndexResources(build_gpus_);
fiu_do_on("BuildIndexPass.Init.get_config_fail", s = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!s.ok()) {
throw std::exception();
}
SetIdentity("BuildIndexPass");
AddGpuEnableListener();
AddGpuBuildResListener();
}
bool
BuildIndexPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::BuildIndexTask)
return false;
fiu_do_on("BuildIndexPass.Run.empty_gpu_ids", build_gpu_ids_.clear());
if (build_gpu_ids_.empty()) {
SERVER_LOG_WARNING << "BuildIndexPass cannot get build index gpu!";
return false;
}
ResourcePtr res_ptr;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_gpu_ids_[specified_gpu_id_]);
if (!gpu_enable_) {
SERVER_LOG_DEBUG << "Gpu disabled, specify cpu to build index!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
fiu_do_on("BuildIndexPass.Run.empty_gpu_ids", build_gpus_.clear());
if (build_gpus_.empty()) {
SERVER_LOG_WARNING << "BuildIndexPass cannot get build index gpu!";
return false;
}
if (specified_gpu_id_ >= build_gpus_.size()) {
specified_gpu_id_ = specified_gpu_id_ % build_gpus_.size();
}
SERVER_LOG_DEBUG << "Specify gpu" << specified_gpu_id_ << " to build index!";
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_gpus_[specified_gpu_id_]);
specified_gpu_id_ = (specified_gpu_id_ + 1) % build_gpus_.size();
}
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
SERVER_LOG_DEBUG << "Specify gpu" << specified_gpu_id_ << " to build index!";
specified_gpu_id_ = (specified_gpu_id_ + 1) % build_gpu_ids_.size();
return true;
}

View File

@ -22,12 +22,13 @@
#include <unordered_map>
#include <vector>
#include "Pass.h"
#include "scheduler/optimizer/Pass.h"
#include "scheduler/optimizer/handler/GpuBuildResHandler.h"
namespace milvus {
namespace scheduler {
class BuildIndexPass : public Pass {
class BuildIndexPass : public Pass, public GpuBuildResHandler {
public:
BuildIndexPass() = default;
@ -40,7 +41,6 @@ class BuildIndexPass : public Pass {
private:
uint64_t specified_gpu_id_ = 0;
std::vector<int64_t> build_gpu_ids_;
};
using BuildIndexPassPtr = std::shared_ptr<BuildIndexPass>;

View File

@ -29,10 +29,15 @@ FaissFlatPass::Init() {
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
s = config.GetGpuResourceConfigSearchResources(search_gpus_);
if (!s.ok()) {
throw std::exception();
}
SetIdentity("FaissFlatPass");
AddGpuEnableListener();
AddGpuSearchThresholdListener();
AddGpuSearchResListener();
}
bool
@ -48,14 +53,17 @@ FaissFlatPass::Run(const TaskPtr& task) {
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
if (!gpu_enable_) {
SERVER_LOG_DEBUG << "FaissFlatPass: gpu disable, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else if (search_job->nq() < threshold_) {
SERVER_LOG_DEBUG << "FaissFlatPass: nq < gpu_search_threshold, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
auto best_device_id = count_ % search_gpus_.size();
SERVER_LOG_DEBUG << "FaissFlatPass: nq > gpu_search_threshold, specify gpu" << best_device_id << " to search!";
++count_;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpus[best_device_id]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -23,12 +23,13 @@
#include <unordered_map>
#include <vector>
#include "Pass.h"
#include "scheduler/optimizer/Pass.h"
#include "src/scheduler/optimizer/handler/GpuSearchResHandler.h"
namespace milvus {
namespace scheduler {
class FaissFlatPass : public Pass {
class FaissFlatPass : public Pass, public GpuSearchResHandler {
public:
FaissFlatPass() = default;
@ -40,9 +41,7 @@ class FaissFlatPass : public Pass {
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using FaissFlatPassPtr = std::shared_ptr<FaissFlatPass>;

View File

@ -29,10 +29,15 @@ FaissIVFFlatPass::Init() {
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
s = config.GetGpuResourceConfigSearchResources(search_gpus_);
if (!s.ok()) {
throw std::exception();
}
SetIdentity("FaissIVFFlatPass");
AddGpuEnableListener();
AddGpuSearchThresholdListener();
AddGpuSearchResListener();
#endif
}
@ -49,15 +54,18 @@ FaissIVFFlatPass::Run(const TaskPtr& task) {
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
if (!gpu_enable_) {
SERVER_LOG_DEBUG << "FaissIVFFlatPass: gpu disable, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else if (search_job->nq() < threshold_) {
SERVER_LOG_DEBUG << "FaissIVFFlatPass: nq < gpu_search_threshold, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
auto best_device_id = count_ % search_gpus_.size();
SERVER_LOG_DEBUG << "FaissIVFFlatPass: nq > gpu_search_threshold, specify gpu" << best_device_id
<< " to search!";
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpus[best_device_id]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -23,12 +23,13 @@
#include <unordered_map>
#include <vector>
#include "Pass.h"
#include "scheduler/optimizer/Pass.h"
#include "scheduler/optimizer/handler/GpuSearchResHandler.h"
namespace milvus {
namespace scheduler {
class FaissIVFFlatPass : public Pass {
class FaissIVFFlatPass : public Pass, public GpuSearchResHandler {
public:
FaissIVFFlatPass() = default;
@ -40,9 +41,7 @@ class FaissIVFFlatPass : public Pass {
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using FaissIVFFlatPassPtr = std::shared_ptr<FaissIVFFlatPass>;

View File

@ -31,10 +31,15 @@ FaissIVFPQPass::Init() {
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
s = config.GetGpuResourceConfigSearchResources(search_gpus_);
if (!s.ok()) {
throw std::exception();
}
SetIdentity("FaissIVFPQPass");
AddGpuEnableListener();
AddGpuSearchThresholdListener();
AddGpuSearchResListener();
#endif
}
@ -51,14 +56,17 @@ FaissIVFPQPass::Run(const TaskPtr& task) {
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
if (!gpu_enable_) {
SERVER_LOG_DEBUG << "FaissIVFPQPass: gpu disable, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else if (search_job->nq() < threshold_) {
SERVER_LOG_DEBUG << "FaissIVFPQPass: nq < gpu_search_threshold, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
auto best_device_id = count_ % search_gpus_.size();
SERVER_LOG_DEBUG << "FaissIVFPQPass: nq > gpu_search_threshold, specify gpu" << best_device_id << " to search!";
++count_;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpus[best_device_id]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -23,12 +23,13 @@
#include <unordered_map>
#include <vector>
#include "Pass.h"
#include "scheduler/optimizer/Pass.h"
#include "scheduler/optimizer/handler/GpuSearchResHandler.h"
namespace milvus {
namespace scheduler {
class FaissIVFPQPass : public Pass {
class FaissIVFPQPass : public Pass, public GpuSearchResHandler {
public:
FaissIVFPQPass() = default;
@ -40,9 +41,7 @@ class FaissIVFPQPass : public Pass {
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using FaissIVFPQPassPtr = std::shared_ptr<FaissIVFPQPass>;

View File

@ -29,7 +29,15 @@ FaissIVFSQ8HPass::Init() {
if (!s.ok()) {
threshold_ = std::numeric_limits<int64_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
s = config.GetGpuResourceConfigSearchResources(search_gpus_);
if (!s.ok()) {
throw std::exception();
}
SetIdentity("FaissIVFSQ8HPass");
AddGpuEnableListener();
AddGpuSearchThresholdListener();
AddGpuSearchResListener();
#endif
}
@ -47,15 +55,19 @@ FaissIVFSQ8HPass::Run(const TaskPtr& task) {
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (!gpu_enable_) {
SERVER_LOG_DEBUG << "FaissIVFSQ8HPass: gpu disable, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
}
if (search_job->nq() < threshold_) {
SERVER_LOG_DEBUG << "FaissIVFSQ8HPass: nq < gpu_search_threshold, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
auto best_device_id = count_ % search_gpus_.size();
SERVER_LOG_DEBUG << "FaissIVFSQ8HPass: nq > gpu_search_threshold, specify gpu" << best_device_id
<< " to search!";
++count_;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpus[best_device_id]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -23,12 +23,13 @@
#include <unordered_map>
#include <vector>
#include "Pass.h"
#include "scheduler/optimizer/Pass.h"
#include "scheduler/optimizer/handler/GpuSearchResHandler.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8HPass : public Pass {
class FaissIVFSQ8HPass : public Pass, public GpuSearchResHandler {
public:
FaissIVFSQ8HPass() = default;
@ -40,9 +41,7 @@ class FaissIVFSQ8HPass : public Pass {
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using FaissIVFSQ8HPassPtr = std::shared_ptr<FaissIVFSQ8HPass>;

View File

@ -29,10 +29,15 @@ FaissIVFSQ8Pass::Init() {
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
s = config.GetGpuResourceConfigSearchResources(search_gpus_);
if (!s.ok()) {
throw std::exception();
}
SetIdentity("FaissIVFSQ8Pass");
AddGpuEnableListener();
AddGpuSearchThresholdListener();
AddGpuSearchResListener();
#endif
}
@ -49,15 +54,18 @@ FaissIVFSQ8Pass::Run(const TaskPtr& task) {
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
if (!gpu_enable_) {
SERVER_LOG_DEBUG << "FaissIVFSQ8Pass: gpu disable, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else if (search_job->nq() < threshold_) {
SERVER_LOG_DEBUG << "FaissIVFSQ8Pass: nq < gpu_search_threshold, specify cpu to search!";
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
auto best_device_id = count_ % search_gpus_.size();
SERVER_LOG_DEBUG << "FaissIVFSQ8Pass: nq > gpu_search_threshold, specify gpu" << best_device_id
<< " to search!";
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpus[best_device_id]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -23,12 +23,13 @@
#include <unordered_map>
#include <vector>
#include "Pass.h"
#include "scheduler/optimizer/Pass.h"
#include "scheduler/optimizer/handler/GpuSearchResHandler.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8Pass : public Pass {
class FaissIVFSQ8Pass : public Pass, public GpuSearchResHandler {
public:
FaissIVFSQ8Pass() = default;
@ -40,9 +41,7 @@ class FaissIVFSQ8Pass : public Pass {
Run(const TaskPtr& task) override;
private:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
int64_t count_ = 0;
std::vector<int64_t> gpus;
};
using FaissIVFSQ8PassPtr = std::shared_ptr<FaissIVFSQ8Pass>;

View File

@ -0,0 +1,55 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "scheduler/optimizer/handler/GpuBuildResHandler.h"
#include <string>
#include <vector>
namespace milvus {
namespace scheduler {
GpuBuildResHandler::GpuBuildResHandler() {
server::Config& config = server::Config::GetInstance();
config.GetGpuResourceConfigBuildIndexResources(build_gpus_);
}
GpuBuildResHandler::~GpuBuildResHandler() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, identity_);
}
////////////////////////////////////////////////////////////////
void
GpuBuildResHandler::OnGpuBuildResChanged(const std::vector<int64_t>& gpus) {
build_gpus_ = gpus;
}
void
GpuBuildResHandler::AddGpuBuildResListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
std::vector<int64_t> gpu_ids;
auto status = config.GetGpuResourceConfigSearchResources(gpu_ids);
if (status.ok()) {
OnGpuBuildResChanged(gpu_ids);
}
return status;
};
config.RegisterCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, identity_,
lambda);
}
} // namespace scheduler
} // namespace milvus
#endif

View File

@ -0,0 +1,41 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#pragma once
#include "scheduler/optimizer/handler/GpuResourcesHandler.h"
#include <vector>
namespace milvus {
namespace scheduler {
class GpuBuildResHandler : virtual public GpuResourcesHandler {
public:
GpuBuildResHandler();
~GpuBuildResHandler();
public:
virtual void
OnGpuBuildResChanged(const std::vector<int64_t>& gpus);
protected:
void
AddGpuBuildResListener();
protected:
std::vector<int64_t> build_gpus_;
};
} // namespace scheduler
} // namespace milvus
#endif

View File

@ -0,0 +1,59 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "scheduler/optimizer/handler/GpuResourcesHandler.h"
namespace milvus {
namespace scheduler {
GpuResourcesHandler::GpuResourcesHandler() {
server::Config& config = server::Config::GetInstance();
config.GetGpuResourceConfigEnable(gpu_enable_);
}
GpuResourcesHandler::~GpuResourcesHandler() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_);
}
//////////////////////////////////////////////////////////////
void
GpuResourcesHandler::OnGpuEnableChanged(bool enable) {
gpu_enable_ = enable;
}
void
GpuResourcesHandler::SetIdentity(const std::string& identity) {
server::Config& config = server::Config::GetInstance();
config.GenUniqueIdentityID(identity, identity_);
}
void
GpuResourcesHandler::AddGpuEnableListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
bool enable;
auto status = config.GetGpuResourceConfigEnable(enable);
if (status.ok()) {
OnGpuEnableChanged(enable);
}
return status;
};
config.RegisterCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_, lambda);
}
} // namespace scheduler
} // namespace milvus
#endif

View File

@ -0,0 +1,47 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#pragma once
#include <exception>
#include <limits>
#include <string>
#include "server/Config.h"
namespace milvus {
namespace scheduler {
class GpuResourcesHandler {
public:
GpuResourcesHandler();
~GpuResourcesHandler();
public:
virtual void
OnGpuEnableChanged(bool enable);
protected:
void
SetIdentity(const std::string& identity);
void
AddGpuEnableListener();
protected:
bool gpu_enable_ = true;
std::string identity_;
};
} // namespace scheduler
} // namespace milvus
#endif

View File

@ -0,0 +1,88 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "scheduler/optimizer/handler/GpuSearchResHandler.h"
#include <string>
#include <vector>
#include "server/Config.h"
namespace milvus {
namespace scheduler {
GpuSearchResHandler::GpuSearchResHandler() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
config.GetGpuResourceConfigSearchResources(search_gpus_);
}
GpuSearchResHandler::~GpuSearchResHandler() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_ENGINE, server::CONFIG_ENGINE_GPU_SEARCH_THRESHOLD, identity_);
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_SEARCH_RESOURCES, identity_);
}
////////////////////////////////////////////////////////////////////////
void
GpuSearchResHandler::OnGpuSearchThresholdChanged(int64_t threshold) {
threshold_ = threshold;
}
void
GpuSearchResHandler::OnGpuSearchResChanged(const std::vector<int64_t>& gpus) {
search_gpus_ = gpus;
}
void
GpuSearchResHandler::AddGpuSearchThresholdListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda_gpu_threshold = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t threshold;
auto status = config.GetEngineConfigGpuSearchThreshold(threshold);
if (status.ok()) {
OnGpuSearchThresholdChanged(threshold);
}
return status;
};
config.RegisterCallBack(server::CONFIG_ENGINE, server::CONFIG_ENGINE_GPU_SEARCH_THRESHOLD, identity_,
lambda_gpu_threshold);
}
void
GpuSearchResHandler::AddGpuSearchResListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda_gpu_search_res = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
std::vector<int64_t> gpu_ids;
auto status = config.GetGpuResourceConfigSearchResources(gpu_ids);
if (status.ok()) {
OnGpuSearchResChanged(gpu_ids);
}
return status;
};
config.RegisterCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_SEARCH_RESOURCES, identity_,
lambda_gpu_search_res);
}
} // namespace scheduler
} // namespace milvus
#endif

View File

@ -0,0 +1,49 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#pragma once
#include "scheduler/optimizer/handler/GpuResourcesHandler.h"
#include <limits>
#include <vector>
namespace milvus {
namespace scheduler {
class GpuSearchResHandler : virtual public GpuResourcesHandler {
public:
GpuSearchResHandler();
~GpuSearchResHandler();
public:
virtual void
OnGpuSearchThresholdChanged(int64_t threshold);
virtual void
OnGpuSearchResChanged(const std::vector<int64_t>& gpus);
protected:
void
AddGpuSearchThresholdListener();
void
AddGpuSearchResListener();
protected:
int64_t threshold_ = std::numeric_limits<int64_t>::max();
std::vector<int64_t> search_gpus_;
};
} // namespace scheduler
} // namespace milvus
#endif

View File

@ -10,11 +10,14 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm>
#include <chrono>
#include <fstream>
#include <iostream>
#include <regex>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
@ -26,6 +29,8 @@
#include "utils/StringHelpFunctions.h"
#include "utils/ValidationUtil.h"
#include <cache/CpuCacheMgr.h>
#include <cache/GpuCacheMgr.h>
#include <fiu-local.h>
namespace milvus {
@ -35,6 +40,41 @@ constexpr int64_t GB = 1UL << 30;
static const std::unordered_map<std::string, std::string> milvus_config_version_map({{"0.6.0", "0.1"}});
/////////////////////////////////////////////////////////////
Config::Config() {
auto empty_map = std::unordered_map<std::string, ConfigCallBackF>();
// cache config
std::string node_cpu_cache_capacity = std::string(CONFIG_CACHE) + "." + CONFIG_CACHE_CPU_CACHE_CAPACITY;
config_callback_[node_cpu_cache_capacity] = empty_map;
std::string node_insert_buffer_size = std::string(CONFIG_CACHE) + "." + CONFIG_CACHE_INSERT_BUFFER_SIZE;
config_callback_[node_insert_buffer_size] = empty_map;
std::string node_cache_insert_data = std::string(CONFIG_CACHE) + "." + CONFIG_CACHE_CACHE_INSERT_DATA;
config_callback_[node_cache_insert_data] = empty_map;
// engine config
std::string node_blas_threshold = std::string(CONFIG_ENGINE) + "." + CONFIG_ENGINE_USE_BLAS_THRESHOLD;
config_callback_[node_blas_threshold] = empty_map;
// gpu resources config
std::string node_gpu_search_threshold = std::string(CONFIG_ENGINE) + "." + CONFIG_ENGINE_GPU_SEARCH_THRESHOLD;
config_callback_[node_gpu_search_threshold] = empty_map;
std::string node_gpu_enable = std::string(CONFIG_GPU_RESOURCE) + "." + CONFIG_GPU_RESOURCE_ENABLE;
config_callback_[node_gpu_enable] = empty_map;
std::string node_gpu_cache_capacity = std::string(CONFIG_GPU_RESOURCE) + "." + CONFIG_GPU_RESOURCE_CACHE_CAPACITY;
config_callback_[node_gpu_cache_capacity] = empty_map;
std::string node_gpu_search_res = std::string(CONFIG_GPU_RESOURCE) + "." + CONFIG_GPU_RESOURCE_SEARCH_RESOURCES;
config_callback_[node_gpu_search_res] = empty_map;
std::string node_gpu_build_res = std::string(CONFIG_GPU_RESOURCE) + "." + CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES;
config_callback_[node_gpu_build_res] = empty_map;
}
Config&
Config::GetInstance() {
static Config config_inst;
@ -59,6 +99,9 @@ Config::LoadConfigFile(const std::string& filename) {
return s;
}
// store config file path
config_file_ = filename;
return Status::OK();
}
@ -257,53 +300,99 @@ Config::SetConfigCli(const std::string& parent_key, const std::string& child_key
std::string str = "Config node invalid: " + parent_key + CONFIG_NODE_DELIMITER + child_key;
return Status(SERVER_UNEXPECTED_ERROR, str);
}
auto status = Status::OK();
if (parent_key == CONFIG_SERVER) {
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set server_config");
if (child_key == CONFIG_SERVER_ADDRESS) {
status = SetServerConfigAddress(value);
} else if (child_key == CONFIG_SERVER_DEPLOY_MODE) {
status = SetServerConfigDeployMode(value);
} else if (child_key == CONFIG_SERVER_PORT) {
status = SetServerConfigPort(value);
} else if (child_key == CONFIG_SERVER_TIME_ZONE) {
status = SetServerConfigTimeZone(value);
} else if (child_key == CONFIG_SERVER_WEB_PORT) {
status = SetServerConfigWebPort(value);
}
} else if (parent_key == CONFIG_DB) {
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set db_config");
if (child_key == CONFIG_DB_BACKEND_URL) {
status = SetDBConfigBackendUrl(value);
}
} else if (parent_key == CONFIG_STORAGE) {
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set storage_config");
if (child_key == CONFIG_STORAGE_PRIMARY_PATH) {
status = SetStorageConfigPrimaryPath(value);
} else if (child_key == CONFIG_STORAGE_SECONDARY_PATH) {
status = SetStorageConfigSecondaryPath(value);
} else if (child_key == CONFIG_STORAGE_S3_ENABLE) {
status = SetStorageConfigS3Enable(value);
} else if (child_key == CONFIG_STORAGE_S3_ADDRESS) {
status = SetStorageConfigS3Address(value);
} else if (child_key == CONFIG_STORAGE_S3_PORT) {
status = SetStorageConfigS3Port(value);
} else if (child_key == CONFIG_STORAGE_S3_ACCESS_KEY) {
status = SetStorageConfigS3AccessKey(value);
} else if (child_key == CONFIG_STORAGE_S3_SECRET_KEY) {
status = SetStorageConfigS3SecretKey(value);
} else if (child_key == CONFIG_STORAGE_S3_BUCKET) {
status = SetStorageConfigS3Bucket(value);
}
} else if (parent_key == CONFIG_METRIC) {
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set metric_config");
if (child_key == CONFIG_METRIC_ENABLE_MONITOR) {
status = SetMetricConfigEnableMonitor(value);
} else if (child_key == CONFIG_METRIC_ADDRESS) {
status = SetMetricConfigAddress(value);
} else if (child_key == CONFIG_METRIC_PORT) {
status = SetMetricConfigPort(value);
}
} else if (parent_key == CONFIG_CACHE) {
if (child_key == CONFIG_CACHE_CPU_CACHE_CAPACITY) {
return SetCacheConfigCpuCacheCapacity(value);
status = SetCacheConfigCpuCacheCapacity(value);
} else if (child_key == CONFIG_CACHE_CPU_CACHE_THRESHOLD) {
return SetCacheConfigCpuCacheThreshold(value);
status = SetCacheConfigCpuCacheThreshold(value);
} else if (child_key == CONFIG_CACHE_CACHE_INSERT_DATA) {
return SetCacheConfigCacheInsertData(value);
status = SetCacheConfigCacheInsertData(value);
} else if (child_key == CONFIG_CACHE_INSERT_BUFFER_SIZE) {
return SetCacheConfigInsertBufferSize(value);
status = SetCacheConfigInsertBufferSize(value);
}
} else if (parent_key == CONFIG_ENGINE) {
if (child_key == CONFIG_ENGINE_USE_BLAS_THRESHOLD) {
return SetEngineConfigUseBlasThreshold(value);
status = SetEngineConfigUseBlasThreshold(value);
} else if (child_key == CONFIG_ENGINE_OMP_THREAD_NUM) {
return SetEngineConfigOmpThreadNum(value);
status = SetEngineConfigOmpThreadNum(value);
#ifdef MILVUS_GPU_VERSION
} else if (child_key == CONFIG_ENGINE_GPU_SEARCH_THRESHOLD) {
return SetEngineConfigGpuSearchThreshold(value);
status = SetEngineConfigGpuSearchThreshold(value);
#endif
}
#ifdef MILVUS_GPU_VERSION
} else if (parent_key == CONFIG_GPU_RESOURCE) {
if (child_key == CONFIG_GPU_RESOURCE_ENABLE) {
return SetGpuResourceConfigEnable(value);
status = SetGpuResourceConfigEnable(value);
} else if (child_key == CONFIG_GPU_RESOURCE_CACHE_CAPACITY) {
return SetGpuResourceConfigCacheCapacity(value);
status = SetGpuResourceConfigCacheCapacity(value);
} else if (child_key == CONFIG_GPU_RESOURCE_CACHE_THRESHOLD) {
return SetGpuResourceConfigCacheThreshold(value);
status = SetGpuResourceConfigCacheThreshold(value);
} else if (child_key == CONFIG_GPU_RESOURCE_SEARCH_RESOURCES) {
return SetGpuResourceConfigSearchResources(value);
status = SetGpuResourceConfigSearchResources(value);
} else if (child_key == CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES) {
return SetGpuResourceConfigBuildIndexResources(value);
status = SetGpuResourceConfigBuildIndexResources(value);
}
#endif
} else if (parent_key == CONFIG_TRACING) {
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set tracing_config");
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set tracing_config currently");
}
if (status.ok()) {
status = UpdateFileConfigFromMem(parent_key, child_key);
if (status.ok() && (parent_key == CONFIG_SERVER || parent_key == CONFIG_DB || parent_key == CONFIG_STORAGE ||
parent_key == CONFIG_METRIC || parent_key == CONFIG_TRACING)) {
restart_required_ = true;
}
}
return status;
}
//////////////////////////////////////////////////////////////
Status
Config::ProcessConfigCli(std::string& result, const std::string& cmd) {
std::vector<std::string> tokens;
@ -337,6 +426,164 @@ Config::ProcessConfigCli(std::string& result, const std::string& cmd) {
}
}
Status
Config::GenUniqueIdentityID(const std::string& identity, std::string& uid) {
std::vector<std::string> elements;
elements.push_back(identity);
// get current process id
int64_t pid = getpid();
elements.push_back(std::to_string(pid));
// get current thread id
std::stringstream ss;
ss << std::this_thread::get_id();
elements.push_back(ss.str());
// get current timestamp
auto time_now = std::chrono::system_clock::now();
auto duration_in_ms = std::chrono::duration_cast<std::chrono::microseconds>(time_now.time_since_epoch());
elements.push_back(std::to_string(duration_in_ms.count()));
StringHelpFunctions::MergeStringWithDelimeter(elements, "-", uid);
return Status::OK();
}
Status
Config::UpdateFileConfigFromMem(const std::string& parent_key, const std::string& child_key) {
if (access(config_file_.c_str(), F_OK | R_OK) != 0) {
return Status(SERVER_UNEXPECTED_ERROR, "Cannot find configure file: " + config_file_);
}
// Store original configure file
std::string ori_file = config_file_ + ".ori";
if (access(ori_file.c_str(), F_OK) != 0) {
std::fstream fin(config_file_, std::ios::in);
std::ofstream fout(ori_file);
if (!fin.is_open() || !fout.is_open()) {
return Status(SERVER_UNEXPECTED_ERROR, "Cannot open conf file. Store original conf file failed");
}
fout << fin.rdbuf();
fout.flush();
fout.close();
fin.close();
}
std::string value;
auto status = GetConfigValueInMem(parent_key, child_key, value);
if (!status.ok()) {
return status;
}
// convert value string to standard string stored in yaml file
std::string value_str;
if (child_key == CONFIG_CACHE_CACHE_INSERT_DATA || child_key == CONFIG_STORAGE_S3_ENABLE ||
child_key == CONFIG_METRIC_ENABLE_MONITOR || child_key == CONFIG_GPU_RESOURCE_ENABLE) {
value_str =
(value == "True" || value == "true" || value == "On" || value == "on" || value == "1") ? "true" : "false";
} else if (child_key == CONFIG_GPU_RESOURCE_SEARCH_RESOURCES ||
child_key == CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES) {
std::vector<std::string> vec;
StringHelpFunctions::SplitStringByDelimeter(value, ",", vec);
for (auto& s : vec) {
value_str += "\n - " + s;
}
} else {
value_str = value;
}
std::fstream conf_fin(config_file_, std::ios::in);
if (!conf_fin.is_open()) {
return Status(SERVER_UNEXPECTED_ERROR, "Cannot open conf file: " + config_file_);
}
bool parent_key_read = false;
std::string conf_str, line;
while (getline(conf_fin, line)) {
if (!parent_key_read) {
conf_str += line + "\n";
if (!(line.empty() || line.find_first_of('#') == 0 || line.find(parent_key) == std::string::npos))
parent_key_read = true;
continue;
}
if (line.find_first_of('#') == 0) {
status = Status(SERVER_UNEXPECTED_ERROR, "Cannot find child key: " + child_key);
break;
}
if (line.find(child_key) != std::string::npos) {
// may loss comments here, need to extract comments from line
conf_str += " " + child_key + ": " + value_str + "\n";
break;
}
conf_str += line + "\n";
}
// values of gpu resources are sequences, need to remove old here
std::regex reg("\\S*");
if (child_key == CONFIG_GPU_RESOURCE_SEARCH_RESOURCES || child_key == CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES) {
while (getline(conf_fin, line)) {
if (line.find("- gpu") != std::string::npos)
continue;
conf_str += line + "\n";
if (!line.empty() && line.size() > 2 && isalnum(line.at(2))) {
break;
}
}
}
if (status.ok()) {
while (getline(conf_fin, line)) {
conf_str += line + "\n";
}
conf_fin.close();
std::fstream fout(config_file_, std::ios::out | std::ios::trunc);
fout << conf_str;
fout.flush();
fout.close();
}
return status;
}
Status
Config::RegisterCallBack(const std::string& node, const std::string& sub_node, const std::string& key,
ConfigCallBackF& cb) {
std::string cb_node = node + "." + sub_node;
if (config_callback_.find(cb_node) == config_callback_.end()) {
return Status(SERVER_UNEXPECTED_ERROR, cb_node + " is not supported changed in mem");
}
auto& callback_map = config_callback_.at(cb_node);
callback_map[key] = cb;
return Status::OK();
}
Status
Config::CancelCallBack(const std::string& node, const std::string& sub_node, const std::string& key) {
if (config_callback_.empty() || key.empty()) {
return Status::OK();
}
std::string cb_node = node + "." + sub_node;
if (config_callback_.find(cb_node) == config_callback_.end()) {
return Status(SERVER_UNEXPECTED_ERROR, cb_node + " cannot found in callback map");
}
auto& cb_map = config_callback_.at(cb_node);
cb_map.erase(key);
return Status::OK();
}
////////////////////////////////////////////////////////////////////////////////
Status
Config::CheckConfigVersion(const std::string& value) {
@ -482,12 +729,37 @@ Config::CheckStorageConfigPrimaryPath(const std::string& value) {
if (value.empty()) {
return Status(SERVER_INVALID_ARGUMENT, "storage_config.db_path is empty.");
}
return Status::OK();
return ValidationUtil::ValidateStoragePath(value);
}
Status
Config::CheckStorageConfigSecondaryPath(const std::string& value) {
fiu_return_on("check_config_secondary_path_fail", Status(SERVER_INVALID_ARGUMENT, ""));
auto status = Status::OK();
if (value.empty()) {
return status;
}
std::vector<std::string> vec;
StringHelpFunctions::SplitStringByDelimeter(value, ",", vec);
std::unordered_set<std::string> path_set;
for (auto& path : vec) {
StringHelpFunctions::TrimStringBlank(path);
status = ValidationUtil::ValidateStoragePath(path);
if (!status.ok()) {
return status;
}
path_set.insert(path);
}
if (path_set.size() != vec.size()) {
return Status(SERVER_INVALID_ARGUMENT, "Path value is duplicated");
}
return Status::OK();
}
@ -950,6 +1222,32 @@ Config::GetConfigVersion(std::string& value) {
return CheckConfigVersion(value);
}
Status
Config::ExecCallBacks(const std::string& node, const std::string& sub_node, const std::string& value) {
auto status = Status::OK();
if (config_callback_.empty()) {
return Status(SERVER_UNEXPECTED_ERROR, "Callback map is empty. Cannot take effect in-service");
}
std::string cb_node = node + "." + sub_node;
if (config_callback_.find(cb_node) == config_callback_.end()) {
return Status(SERVER_UNEXPECTED_ERROR,
"Cannot find " + cb_node + " in callback map, cannot take effect in-service");
}
auto& cb_map = config_callback_.at(cb_node);
for (auto& cb_kv : cb_map) {
auto& cd = cb_kv.second;
status = cd(value);
if (!status.ok()) {
break;
}
}
return status;
}
/* server config */
Status
Config::GetServerConfigAddress(std::string& value) {
@ -1213,6 +1511,7 @@ Config::GetGpuResourceConfigSearchResources(std::vector<int64_t>& value) {
std::vector<std::string> res_vec;
server::StringHelpFunctions::SplitStringByDelimeter(str, CONFIG_GPU_RESOURCE_DELIMITER, res_vec);
CONFIG_CHECK(CheckGpuResourceConfigSearchResources(res_vec));
value.clear();
for (std::string& res : res_vec) {
value.push_back(std::stoll(res.substr(3)));
}
@ -1234,6 +1533,7 @@ Config::GetGpuResourceConfigBuildIndexResources(std::vector<int64_t>& value) {
std::vector<std::string> res_vec;
server::StringHelpFunctions::SplitStringByDelimeter(str, CONFIG_GPU_RESOURCE_DELIMITER, res_vec);
CONFIG_CHECK(CheckGpuResourceConfigBuildIndexResources(res_vec));
value.clear();
for (std::string& res : res_vec) {
value.push_back(std::stoll(res.substr(3)));
}
@ -1258,6 +1558,12 @@ Config::GetTracingConfigJsonConfigPath(std::string& value) {
return Status::OK();
}
Status
Config::GetServerRestartRequired(bool& required) {
required = restart_required_;
return Status::OK();
}
///////////////////////////////////////////////////////////////////////////////
/* server config */
Status
@ -1381,7 +1687,12 @@ Config::SetMetricConfigPort(const std::string& value) {
Status
Config::SetCacheConfigCpuCacheCapacity(const std::string& value) {
CONFIG_CHECK(CheckCacheConfigCpuCacheCapacity(value));
return SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, value);
auto status = SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, value);
if (status.ok()) {
cache::CpuCacheMgr::GetInstance()->SetCapacity(std::stol(value) << 30);
}
return status;
}
Status
@ -1393,20 +1704,36 @@ Config::SetCacheConfigCpuCacheThreshold(const std::string& value) {
Status
Config::SetCacheConfigInsertBufferSize(const std::string& value) {
CONFIG_CHECK(CheckCacheConfigInsertBufferSize(value));
return SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_INSERT_BUFFER_SIZE, value);
auto status = SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_INSERT_BUFFER_SIZE, value);
if (!status.ok()) {
return status;
}
return ExecCallBacks(CONFIG_CACHE, CONFIG_CACHE_INSERT_BUFFER_SIZE, value);
}
Status
Config::SetCacheConfigCacheInsertData(const std::string& value) {
CONFIG_CHECK(CheckCacheConfigCacheInsertData(value));
return SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, value);
auto status = SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, value);
if (!status.ok()) {
return status;
}
return ExecCallBacks(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, value);
}
/* engine config */
Status
Config::SetEngineConfigUseBlasThreshold(const std::string& value) {
CONFIG_CHECK(CheckEngineConfigUseBlasThreshold(value));
return SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, value);
auto status = SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, value);
if (!status.ok()) {
return status;
}
return ExecCallBacks(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, value);
}
Status
@ -1420,7 +1747,12 @@ Config::SetEngineConfigOmpThreadNum(const std::string& value) {
Status
Config::SetEngineConfigGpuSearchThreshold(const std::string& value) {
CONFIG_CHECK(CheckEngineConfigGpuSearchThreshold(value));
return SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_GPU_SEARCH_THRESHOLD, value);
auto status = SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_GPU_SEARCH_THRESHOLD, value);
if (!status.ok()) {
return status;
}
return ExecCallBacks(CONFIG_ENGINE, CONFIG_ENGINE_GPU_SEARCH_THRESHOLD, value);
}
#endif
@ -1431,13 +1763,31 @@ Config::SetEngineConfigGpuSearchThreshold(const std::string& value) {
Status
Config::SetGpuResourceConfigEnable(const std::string& value) {
CONFIG_CHECK(CheckGpuResourceConfigEnable(value));
return SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_ENABLE, value);
auto status = SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_ENABLE, value);
if (!status.ok()) {
return status;
}
return ExecCallBacks(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_ENABLE, value);
}
Status
Config::SetGpuResourceConfigCacheCapacity(const std::string& value) {
CONFIG_CHECK(CheckGpuResourceConfigCacheCapacity(value));
return SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_CACHE_CAPACITY, value);
auto status = SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_CACHE_CAPACITY, value);
if (!status.ok()) {
return status;
}
int64_t cap = std::stol(value);
std::vector<int64_t> gpus;
GetGpuResourceConfigSearchResources(gpus);
for (auto& g : gpus) {
cache::GpuCacheMgr::GetInstance(g)->SetCapacity(cap << 30);
}
return Status::OK();
}
Status
@ -1451,7 +1801,12 @@ Config::SetGpuResourceConfigSearchResources(const std::string& value) {
std::vector<std::string> res_vec;
server::StringHelpFunctions::SplitStringByDelimeter(value, CONFIG_GPU_RESOURCE_DELIMITER, res_vec);
CONFIG_CHECK(CheckGpuResourceConfigSearchResources(res_vec));
return SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_SEARCH_RESOURCES, value);
auto status = SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_SEARCH_RESOURCES, value);
if (!status.ok()) {
return status;
}
return ExecCallBacks(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_SEARCH_RESOURCES, value);
}
Status
@ -1459,7 +1814,13 @@ Config::SetGpuResourceConfigBuildIndexResources(const std::string& value) {
std::vector<std::string> res_vec;
server::StringHelpFunctions::SplitStringByDelimeter(value, CONFIG_GPU_RESOURCE_DELIMITER, res_vec);
CONFIG_CHECK(CheckGpuResourceConfigBuildIndexResources(res_vec));
return SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, value);
auto status = SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, value);
if (!status.ok()) {
return status;
}
return ExecCallBacks(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, value);
}
#endif

View File

@ -11,6 +11,7 @@
#pragma once
#include <functional>
#include <mutex>
#include <string>
#include <unordered_map>
@ -22,6 +23,8 @@
namespace milvus {
namespace server {
using ConfigCallBackF = std::function<Status(const std::string&)>;
#define CONFIG_CHECK(func) \
do { \
Status s = func; \
@ -129,6 +132,9 @@ static const char* CONFIG_TRACING = "tracing_config";
static const char* CONFIG_TRACING_JSON_CONFIG_PATH = "json_config_path";
class Config {
private:
Config();
public:
static Config&
GetInstance();
@ -143,6 +149,16 @@ class Config {
Status
ProcessConfigCli(std::string& result, const std::string& cmd);
Status
GenUniqueIdentityID(const std::string& identity, std::string& uid);
Status
RegisterCallBack(const std::string& node, const std::string& sub_node, const std::string& key,
ConfigCallBackF& callback);
Status
CancelCallBack(const std::string& node, const std::string& sub_node, const std::string& key);
private:
ConfigNode&
GetConfigRoot();
@ -159,6 +175,9 @@ class Config {
Status
SetConfigCli(const std::string& parent_key, const std::string& child_key, const std::string& value);
Status
UpdateFileConfigFromMem(const std::string& parent_key, const std::string& child_key);
///////////////////////////////////////////////////////////////////////////
Status
CheckConfigVersion(const std::string& value);
@ -250,6 +269,9 @@ class Config {
Status
GetConfigVersion(std::string& value);
Status
ExecCallBacks(const std::string& node, const std::string& sub_node, const std::string& value);
public:
/* server config */
Status
@ -336,6 +358,9 @@ class Config {
Status
GetTracingConfigJsonConfigPath(std::string& value);
Status
GetServerRestartRequired(bool& required);
public:
/* server config */
Status
@ -417,7 +442,10 @@ class Config {
#endif
private:
bool restart_required_ = false;
std::string config_file_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>> config_map_;
std::unordered_map<std::string, std::unordered_map<std::string, ConfigCallBackF>> config_callback_;
std::mutex mutex_;
};

View File

@ -110,6 +110,17 @@ DBWrapper::StartService() {
}
faiss::distance_compute_blas_threshold = use_blas_threshold;
server::ConfigCallBackF lambda = [](const std::string& value) -> Status {
Config& config = Config::GetInstance();
int64_t blas_threshold;
auto status = config.GetEngineConfigUseBlasThreshold(blas_threshold);
if (status.ok()) {
faiss::distance_compute_blas_threshold = blas_threshold;
}
return status;
};
config.RegisterCallBack(server::CONFIG_ENGINE, server::CONFIG_ENGINE_USE_BLAS_THRESHOLD, "DBWrapper", lambda);
// set archive config
engine::ArchiveConf::CriteriaT criterial;

View File

@ -65,6 +65,8 @@ enum StatusCode : int {
PATH_PARAM_LOSS = 31,
QUERY_PARAM_LOSS = 32,
BODY_FIELD_LOSS = 33,
ILLEGAL_BODY = 34,
BODY_PARSE_FAIL = 35,
ILLEGAL_QUERY_PARAM = 36,
};

View File

@ -51,10 +51,6 @@ class WebController : public oatpp::web::server::api::ApiController {
*/
#include OATPP_CODEGEN_BEGIN(ApiController)
ENDPOINT_INFO(root) {
info->summary = "Index.html page";
}
ADD_CORS(root)
ENDPOINT("GET", "/", root) {
@ -63,13 +59,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(State) {
info->summary = "Server state";
info->description = "Check web server whether is ready.";
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_200, "application/json");
}
ADD_CORS(State)
ENDPOINT("GET", "/state", State) {
@ -78,12 +67,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createDtoResponse(Status::CODE_200, StatusDto::createShared());
}
ENDPOINT_INFO(GetDevices) {
info->summary = "Obtain system devices info";
info->addResponse<DevicesDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(GetDevices)
@ -115,13 +98,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(GetAdvancedConfig) {
info->summary = "Obtain cache config and enging config";
info->addResponse<AdvancedConfigDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(GetAdvancedConfig)
ENDPOINT("GET", "/config/advanced", GetAdvancedConfig) {
@ -147,15 +123,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(SetAdvancedConfig) {
info->summary = "Modify cache config and enging config";
info->addConsumes<AdvancedConfigDto::ObjectWrapper>("application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(SetAdvancedConfig)
ENDPOINT("PUT", "/config/advanced", SetAdvancedConfig, BODY_DTO(AdvancedConfigDto::ObjectWrapper, body)) {
@ -188,13 +155,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(GetGPUConfig) {
info->summary = "Obtain GPU resources config info";
info->addResponse<GPUConfigDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(GetGPUConfig)
ENDPOINT("GET", "/config/gpu_resources", GetGPUConfig) {
@ -221,14 +181,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(SetGPUConfig) {
info->summary = "Set GPU resources config";
info->addConsumes<GPUConfigDto::ObjectWrapper>("application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(SetGPUConfig)
ENDPOINT("PUT", "/config/gpu_resources", SetGPUConfig, BODY_DTO(GPUConfigDto::ObjectWrapper, body)) {
@ -261,15 +213,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(CreateTable) {
info->summary = "Create table";
info->addConsumes<TableRequestDto::ObjectWrapper>("application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_201, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(CreateTable)
ENDPOINT("POST", "/tables", CreateTable, BODY_DTO(TableRequestDto::ObjectWrapper, body)) {
@ -294,16 +237,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(ShowTables) {
info->summary = "Show whole tables";
info->queryParams.add<Int64>("offset");
info->queryParams.add<Int64>("page_size");
info->addResponse<TableListFieldsDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(ShowTables)
ENDPOINT("GET", "/tables", ShowTables, QUERIES(const QueryParams&, query_params)) {
@ -339,16 +272,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(GetTable) {
info->summary = "Get table";
info->pathParams.add<String>("table_name");
info->addResponse<TableFieldsDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(GetTable)
ENDPOINT("GET", "/tables/{table_name}", GetTable,
@ -380,14 +303,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(DropTable) {
info->summary = "Drop table";
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_204, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(DropTable)
ENDPOINT("DELETE", "/tables/{table_name}", DropTable, PATH(String, table_name)) {
@ -422,16 +337,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(CreateIndex) {
info->summary = "Create index";
info->addConsumes<IndexRequestDto::ObjectWrapper>("application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_201, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(CreateIndex)
ENDPOINT("POST", "/tables/{table_name}/indexes", CreateIndex,
@ -461,16 +366,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(GetIndex) {
info->summary = "Describe index";
info->pathParams.add<String>("table_name");
info->addResponse<IndexDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(GetIndex)
ENDPOINT("GET", "/tables/{table_name}/indexes", GetIndex, PATH(String, table_name)) {
@ -501,16 +396,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(DropIndex) {
info->summary = "Drop index";
info->pathParams.add<String>("table_name");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_204, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(DropIndex)
ENDPOINT("DELETE", "/tables/{table_name}/indexes", DropIndex, PATH(String, table_name)) {
@ -545,18 +430,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(CreatePartition) {
info->summary = "Create partition";
info->pathParams.add<String>("table_name");
info->addConsumes<PartitionRequestDto::ObjectWrapper>("application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_201, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(CreatePartition)
ENDPOINT("POST", "/tables/{table_name}/partitions",
@ -585,22 +458,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(ShowPartitions) {
info->summary = "Show partitions";
info->pathParams.add<String>("table_name");
info->queryParams.add<Int64>("offset");
info->queryParams.add<Int64>("page_size");
//
info->addResponse<PartitionListDto::ObjectWrapper>(Status::CODE_200, "application/json");
// Error occurred.
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
//
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(ShowPartitions)
ENDPOINT("GET", "/tables/{table_name}/partitions", ShowPartitions,
@ -638,17 +495,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(DropPartition) {
info->summary = "Drop partition";
info->pathParams.add<String>("table_name");
info->pathParams.add<String>("partition_tag");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_204, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(DropPartition)
ENDPOINT("DELETE", "/tables/{table_name}/partitions/{partition_tag}", DropPartition,
@ -684,18 +530,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return createResponse(Status::CODE_204, "No Content");
}
ENDPOINT_INFO(Insert) {
info->summary = "Insert vectors";
info->pathParams.add<String>("table_name");
info->addConsumes<InsertRequestDto::ObjectWrapper>("application/json");
info->addResponse<VectorIdsDto::ObjectWrapper>(Status::CODE_201, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(Insert)
ENDPOINT("POST", "/tables/{table_name}/vectors", Insert,
@ -725,18 +559,6 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(Search) {
info->summary = "Search";
info->pathParams.add<String>("table_name");
info->addConsumes<SearchRequestDto::ObjectWrapper>("application/json");
info->addResponse<TopkResultsDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_404, "application/json");
}
ADD_CORS(Search)
ENDPOINT("PUT", "/tables/{table_name}/vectors", Search,
@ -766,32 +588,23 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ENDPOINT_INFO(SystemMsg) {
info->summary = "Command";
ADD_CORS(SystemInfo)
info->pathParams.add<String>("cmd_str");
info->addResponse<CommandDto::ObjectWrapper>(Status::CODE_200, "application/json");
info->addResponse<StatusDto::ObjectWrapper>(Status::CODE_400, "application/json");
}
ADD_CORS(SystemMsg)
ENDPOINT("GET", "/system/{msg}", SystemMsg, PATH(String, msg), QUERIES(const QueryParams&, query_params)) {
ENDPOINT("GET", "/system/{msg}", SystemInfo, PATH(String, msg), QUERIES(const QueryParams&, query_params)) {
TimeRecorder tr(std::string(WEB_LOG_PREFIX) + "GET \'/system/" + msg->std_str() + "\'");
tr.RecordSection("Received request.");
auto cmd_dto = CommandDto::createShared();
auto info_dto = CommandDto::createShared();
WebRequestHandler handler = WebRequestHandler();
auto status_dto = handler.Cmd(msg, query_params, cmd_dto);
auto status_dto = handler.SystemInfo(msg, info_dto);
std::shared_ptr<OutgoingResponse> response;
switch (status_dto->code->getValue()) {
case StatusCode::SUCCESS:
response = createDtoResponse(Status::CODE_200, cmd_dto);
response = createDtoResponse(Status::CODE_200, info_dto);
break;
default:
return createDtoResponse(Status::CODE_400, status_dto);
response = createDtoResponse(Status::CODE_400, status_dto);
}
tr.ElapseFromBegin("Done. Status: code = " + std::to_string(status_dto->code->getValue())
@ -800,6 +613,32 @@ class WebController : public oatpp::web::server::api::ApiController {
return response;
}
ADD_CORS(SystemOp)
ENDPOINT("PUT", "/system/{Op}", SystemOp, PATH(String, Op), BODY_STRING(String, body_str)) {
TimeRecorder tr(std::string(WEB_LOG_PREFIX) + "PUT \'/system/" + Op->std_str() + "\'");
tr.RecordSection("Received request.");
WebRequestHandler handler = WebRequestHandler();
handler.RegisterRequestHandler(::milvus::server::RequestHandler());
String response_str;
auto status_dto = handler.SystemOp(Op, body_str, response_str);
std::shared_ptr<OutgoingResponse> response;
switch (status_dto->code->getValue()) {
case StatusCode::SUCCESS:
response = createResponse(Status::CODE_200, response_str);
break;
default:
response = createDtoResponse(Status::CODE_400, status_dto);
}
tr.ElapseFromBegin("Done. Status: code = " + std::to_string(status_dto->code->getValue())
+ ", reason = " + status_dto->message->std_str() + ". Total cost");
return response;
}
/**
* Finish ENDPOINTs generation ('ApiController' codegen)
*/

View File

@ -33,6 +33,7 @@ class PartitionListDto : public oatpp::data::mapping::type::Object {
DTO_INIT(PartitionListDto, Object)
DTO_FIELD(List<PartitionFieldsDto::ObjectWrapper>::ObjectWrapper, partitions);
DTO_FIELD(Int64, count) = 0;
};
#include OATPP_CODEGEN_END(DTO)

View File

@ -23,6 +23,7 @@
#include "server/web_impl/Types.h"
#include "server/web_impl/dto/PartitionDto.hpp"
#include "server/web_impl/utils/Util.h"
#include "thirdparty/nlohmann/json.hpp"
#include "utils/StringHelpFunctions.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
@ -581,6 +582,7 @@ WebRequestHandler::ShowPartitions(const OString& offset, const OString& page_siz
ASSIGN_RETURN_STATUS_DTO(status)
}
partition_list_dto->count = partitions.size();
partition_list_dto->partitions = partition_list_dto->partitions->createShared();
if (offset_value < partitions.size()) {
@ -740,48 +742,155 @@ WebRequestHandler::Search(const OString& table_name, const SearchRequestDto::Obj
}
StatusDto::ObjectWrapper
WebRequestHandler::Cmd(const OString& cmd, const OQueryParams& query_params, CommandDto::ObjectWrapper& cmd_dto) {
WebRequestHandler::SystemInfo(const OString& cmd, CommandDto::ObjectWrapper& cmd_dto) {
std::string info = cmd->std_str();
auto status = Status::OK();
// TODO: (yhz) now only support load table into memory, may remove in the future
if ("task" == info) {
auto action = query_params.get("action");
if (nullptr == action.get()) {
RETURN_STATUS_DTO(QUERY_PARAM_LOSS, "Query param \'action\' is required in url \'/system/task\'");
}
std::string action_str = action->std_str();
auto target = query_params.get("target");
if (nullptr == target.get()) {
RETURN_STATUS_DTO(QUERY_PARAM_LOSS, "Query param \'target\' is required in url \'/system/task\'");
}
std::string target_str = target->std_str();
if ("load" == action_str) {
status = request_handler_.PreloadTable(context_ptr_, target_str);
} else {
std::string error_msg = std::string("Unknown action value \'") + action_str + "\'";
RETURN_STATUS_DTO(ILLEGAL_QUERY_PARAM, error_msg.c_str());
}
ASSIGN_RETURN_STATUS_DTO(status)
}
if ("info" == info) {
info = "get_system_info";
}
std::string reply_str;
status = CommandLine(info, reply_str);
if (status.ok()) {
cmd_dto->reply = reply_str.c_str();
if ("config" == info) {
info = "get_config *";
status = CommandLine(info, reply_str);
if (status.ok()) {
try {
nlohmann::json j = nlohmann::json::parse(reply_str);
#ifdef MILVUS_GPU_VERSION
auto gpu_search_res = j["gpu_resource_config"]["search_resources"].get<std::string>();
std::vector<std::string> gpus;
StringHelpFunctions::SplitStringByDelimeter(gpu_search_res, ",", gpus);
j["gpu_resource_config"]["search_resources"] = gpus;
auto gpu_build_res = j["gpu_resource_config"]["build_index_resources"].get<std::string>();
gpus.clear();
StringHelpFunctions::SplitStringByDelimeter(gpu_build_res, ",", gpus);
j["gpu_resource_config"]["build_index_resources"] = gpus;
#endif
// check if server require start
Config& config = Config::GetInstance();
bool required = false;
config.GetServerRestartRequired(required);
j["restart_required"] = required;
cmd_dto->reply = j.dump().c_str();
} catch (std::exception& e) {
RETURN_STATUS_DTO(UNEXPECTED_ERROR, e.what());
}
}
} else {
if ("info" == info) {
info = "get_system_info";
}
status = CommandLine(info, reply_str);
if (status.ok()) {
cmd_dto->reply = reply_str.c_str();
}
}
ASSIGN_RETURN_STATUS_DTO(status);
}
StatusDto::ObjectWrapper
WebRequestHandler::SystemOp(const OString& op, const OString& body_str, OString& response_str) {
Status status = Status::OK();
if (nullptr == body_str.get() || body_str->getSize() == 0) {
RETURN_STATUS_DTO(BODY_FIELD_LOSS, "Payload is empty.");
}
try {
nlohmann::json j = nlohmann::json::parse(body_str->c_str());
if (op->equals("task")) {
if (j.contains("load")) {
auto table_name = j["load"]["table_name"];
if (!table_name.is_string()) {
RETURN_STATUS_DTO(ILLEGAL_BODY, "\"table_name\" must be a string")
}
status = request_handler_.PreloadTable(context_ptr_, table_name.get<std::string>());
}
if (j.contains("flush")) {
auto table_names = j["flush"]["table_names"];
if (!table_names.is_array()) {
RETURN_STATUS_DTO(ILLEGAL_BODY, "\"table_names\" must be a array")
}
std::vector<std::string> names;
for (auto& n : table_names) {
if (!n.is_string()) {
RETURN_STATUS_DTO(ILLEGAL_BODY, "item of \"table_names\" must be a string")
}
names.push_back(n.get<std::string>());
}
status = Status(SERVER_UNEXPECTED_ERROR, "Flush() is not implemented");
// status = request_handler_.Flush(context_ptr_, table_names);
}
if (j.contains("compact")) {
auto table_names = j["compact"]["table_names"];
if (!table_names.is_array()) {
RETURN_STATUS_DTO(ILLEGAL_BODY, "\"table_name\" must be a array")
}
std::vector<std::string> names;
for (auto& n : table_names) {
names.push_back(n.get<std::string>());
}
status = Status(SERVER_UNEXPECTED_ERROR, "Compact() is not implemented");
// status = request_handler_.Compact(context_ptr_, table_names);
}
} else if (op->equals("config")) {
if (!j.is_object()) {
RETURN_STATUS_DTO(ILLEGAL_BODY, "Error format")
}
std::vector<std::string> cmds;
for (auto& el : j.items()) {
auto evalue = el.value();
if (!evalue.is_object()) {
RETURN_STATUS_DTO(ILLEGAL_BODY, "Invalid payload format, the root value must be json map");
}
for (auto& iel : el.value().items()) {
auto ievalue = iel.value();
if (!(ievalue.is_string() || ievalue.is_number() || ievalue.is_boolean())) {
RETURN_STATUS_DTO(ILLEGAL_BODY, "Config value must be one of string, numeric or boolean")
}
std::ostringstream ss;
if (ievalue.is_string()) {
std::string vle = ievalue;
ss << "set_config " << el.key() << "." << iel.key() << " " << vle;
} else {
ss << "set_config " << el.key() << "." << iel.key() << " " << ievalue;
}
std::string cmd = ss.str();
cmds.push_back(cmd);
}
}
std::string reply;
for (auto& c : cmds) {
status = CommandLine(c, reply);
if (!status.ok()) {
ASSIGN_RETURN_STATUS_DTO(status)
}
}
}
} catch (nlohmann::detail::parse_error& e) {
std::string emsg = "json error: code=" + std::to_string(e.id) + ", reason=" + e.what();
RETURN_STATUS_DTO(ILLEGAL_BODY, emsg.c_str());
} catch (nlohmann::detail::type_error& e) {
std::string emsg = "json error: code=" + std::to_string(e.id) + ", reason=" + e.what();
RETURN_STATUS_DTO(ILLEGAL_BODY, emsg.c_str());
}
nlohmann::json j = nlohmann::json();
j["code"] = status.code();
j["message"] = status.message();
if (op->equals("config")) {
Config& config = Config::GetInstance();
bool required = false;
config.GetServerRestartRequired(required);
j["restart_required"] = required;
}
response_str = j.dump().c_str();
ASSIGN_RETURN_STATUS_DTO(status);
}
} // namespace web
} // namespace server
} // namespace milvus

View File

@ -146,7 +146,10 @@ class WebRequestHandler {
TopkResultsDto::ObjectWrapper& results_dto);
StatusDto::ObjectWrapper
Cmd(const OString& cmd, const OQueryParams& query_params, CommandDto::ObjectWrapper& cmd_dto);
SystemInfo(const OString& cmd, CommandDto::ObjectWrapper& cmd_dto);
StatusDto::ObjectWrapper
SystemOp(const OString& op, const OString& body_str, OString& response_str);
public:
WebRequestHandler&

View File

@ -395,5 +395,18 @@ ValidationUtil::ValidateDbURI(const std::string& uri) {
return (okay ? Status::OK() : Status(SERVER_INVALID_ARGUMENT, "Invalid db backend uri"));
}
Status
ValidationUtil::ValidateStoragePath(const std::string& path) {
// Validate storage path if is valid, only correct absolute path will be validated pass
// Invalid path only contain character[a-zA-Z], number[0-9], '-', and '_',
// and path must start with '/'.
// examples below are invalid
// '/a//a', '/a--/a', '/-a/a', '/a@#/a', 'aaa/sfs'
std::string path_pattern = "^\\/(\\w+-?\\/?)+$";
std::regex regex(path_pattern);
return std::regex_match(path, regex) ? Status::OK() : Status(SERVER_INVALID_ARGUMENT, "Invalid file path");
}
} // namespace server
} // namespace milvus

View File

@ -83,6 +83,9 @@ class ValidationUtil {
static Status
ValidateDbURI(const std::string& uri);
static Status
ValidateStoragePath(const std::string& path);
};
} // namespace server

View File

@ -42,6 +42,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer scheduler_optimizer_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer/handler scheduler_optimizer_handler_files)
set(scheduler_files
${scheduler_main_files}
${scheduler_action_files}
@ -50,6 +51,7 @@ set(scheduler_files
${scheduler_resource_files}
${scheduler_task_files}
${scheduler_optimizer_files}
${scheduler_optimizer_handler_files}
)
aux_source_directory(${MILVUS_THIRDPARTY_SRC}/easyloggingpp thirdparty_easyloggingpp_files)

View File

@ -9,9 +9,14 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <limits>
#include <fiu-control.h>
#include <fiu-local.h>
#include <gtest/gtest-death-test.h>
#include <gtest/gtest.h>
#include <cmath>
#include "config/YamlConfigMgr.h"
#include "server/Config.h"
#include "server/utils.h"
@ -19,9 +24,6 @@
#include "utils/StringHelpFunctions.h"
#include "utils/ValidationUtil.h"
#include <limits>
#include <fiu-local.h>
#include <fiu-control.h>
namespace {
@ -332,9 +334,14 @@ gen_set_command(const std::string& parent_node, const std::string& child_node, c
TEST_F(ConfigTest, SERVER_CONFIG_CLI_TEST) {
std::string config_path(CONFIG_PATH);
milvus::server::Config& config = milvus::server::Config::GetInstance();
milvus::Status s;
std::string conf_file = std::string(CONFIG_PATH) + VALID_CONFIG_FILE;
milvus::server::Config& config = milvus::server::Config::GetInstance();
auto status = config.LoadConfigFile(conf_file);
ASSERT_TRUE(status.ok()) << status.message();
std::string get_cmd, set_cmd;
std::string result, dummy;
@ -346,16 +353,16 @@ TEST_F(ConfigTest, SERVER_CONFIG_CLI_TEST) {
get_cmd = gen_get_command(ms::CONFIG_SERVER, ms::CONFIG_SERVER_ADDRESS);
set_cmd = gen_set_command(ms::CONFIG_SERVER, ms::CONFIG_SERVER_ADDRESS, server_addr);
s = config.ProcessConfigCli(dummy, set_cmd);
ASSERT_FALSE(s.ok());
ASSERT_TRUE(s.ok());
s = config.ProcessConfigCli(result, get_cmd);
ASSERT_TRUE(s.ok());
/* db config */
std::string db_backend_url = "bad_url";
std::string db_backend_url = "sqlite://milvus:zilliz@:/";
get_cmd = gen_get_command(ms::CONFIG_DB, ms::CONFIG_DB_BACKEND_URL);
set_cmd = gen_set_command(ms::CONFIG_DB, ms::CONFIG_DB_BACKEND_URL, db_backend_url);
s = config.ProcessConfigCli(dummy, set_cmd);
ASSERT_FALSE(s.ok());
ASSERT_TRUE(s.ok());
s = config.ProcessConfigCli(result, get_cmd);
ASSERT_TRUE(s.ok());
@ -364,7 +371,7 @@ TEST_F(ConfigTest, SERVER_CONFIG_CLI_TEST) {
get_cmd = gen_get_command(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR);
set_cmd = gen_set_command(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, metric_enable_monitor);
s = config.ProcessConfigCli(dummy, set_cmd);
ASSERT_FALSE(s.ok());
ASSERT_TRUE(s.ok());
s = config.ProcessConfigCli(result, get_cmd);
ASSERT_TRUE(s.ok());
@ -373,7 +380,7 @@ TEST_F(ConfigTest, SERVER_CONFIG_CLI_TEST) {
get_cmd = gen_get_command(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_S3_ENABLE);
set_cmd = gen_set_command(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_S3_ENABLE, storage_s3_enable);
s = config.ProcessConfigCli(dummy, set_cmd);
ASSERT_FALSE(s.ok());
ASSERT_TRUE(s.ok());
s = config.ProcessConfigCli(result, get_cmd);
ASSERT_TRUE(s.ok());
@ -994,8 +1001,67 @@ TEST_F(ConfigTest, SERVER_CONFIG_OTHER_CONFIGS_FAIL_TEST) {
ASSERT_FALSE(s.ok());
#ifndef MILVUS_GPU_VERSION
s = config.ProcessConfigCli(dummy, gen_set_command(ms::CONFIG_TRACING,
ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, build_index_resources));
s = config.ProcessConfigCli(
dummy,
gen_set_command(ms::CONFIG_TRACING, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, build_index_resources));
ASSERT_FALSE(s.ok());
#endif
}
TEST_F(ConfigTest, SERVER_CONFIG_UPDATE_TEST) {
std::string conf_file = std::string(CONFIG_PATH) + VALID_CONFIG_FILE;
milvus::server::Config& config = milvus::server::Config::GetInstance();
auto status = config.LoadConfigFile(conf_file);
ASSERT_TRUE(status.ok()) << status.message();
// validate if setting config store in files
status = config.SetCacheConfigInsertBufferSize("2");
ASSERT_TRUE(status.ok()) << status.message();
// test numeric config value
status = config.LoadConfigFile(conf_file);
ASSERT_TRUE(status.ok()) << status.message();
int64_t value;
status = config.GetCacheConfigInsertBufferSize(value);
ASSERT_TRUE(status.ok()) << status.message();
ASSERT_EQ(value, 2);
// test boolean config value
status = config.SetMetricConfigEnableMonitor("True");
ASSERT_TRUE(status.ok()) << status.message();
status = config.LoadConfigFile(conf_file);
ASSERT_TRUE(status.ok()) << status.message();
bool enable;
status = config.GetMetricConfigEnableMonitor(enable);
ASSERT_TRUE(status.ok()) << status.message();
ASSERT_EQ(true, enable);
// invalid path
status = config.SetStorageConfigPrimaryPath("/a--/a");
ASSERT_FALSE(status.ok());
// test path
status = config.SetStorageConfigPrimaryPath("/tmp/milvus_config_unittest");
ASSERT_TRUE(status.ok());
std::string path_value;
status = config.GetStorageConfigPrimaryPath(path_value);
ASSERT_TRUE(status.ok());
ASSERT_EQ(path_value, "/tmp/milvus_config_unittest");
#ifdef MILVUS_GPU_VERSION
status = config.SetGpuResourceConfigBuildIndexResources("gpu0");
ASSERT_TRUE(status.ok()) << status.message();
status = config.LoadConfigFile(conf_file);
ASSERT_TRUE(status.ok()) << status.message();
std::vector<int64_t> gpus;
status = config.GetGpuResourceConfigBuildIndexResources(gpus);
ASSERT_TRUE(status.ok()) << status.message();
ASSERT_EQ(1, gpus.size());
ASSERT_EQ(0, gpus[0]);
ASSERT_EQ(value, 2);
#endif
}

View File

@ -479,6 +479,21 @@ TEST(ValidationUtilTest, VALIDATE_DBURI_TEST) {
milvus::SERVER_SUCCESS);
}
TEST(ValidationUtilTest, VALIDATE_PATH_TEST) {
ASSERT_TRUE(milvus::server::ValidationUtil::ValidateStoragePath("/home/milvus").ok());
ASSERT_TRUE(milvus::server::ValidationUtil::ValidateStoragePath("/tmp/milvus").ok());
ASSERT_TRUE(milvus::server::ValidationUtil::ValidateStoragePath("/tmp/milvus/").ok());
ASSERT_TRUE(milvus::server::ValidationUtil::ValidateStoragePath("/_tmp/milvus12345").ok());
ASSERT_TRUE(milvus::server::ValidationUtil::ValidateStoragePath("/tmp-/milvus").ok());
ASSERT_FALSE(milvus::server::ValidationUtil::ValidateStoragePath("/-tmp/milvus").ok());
ASSERT_FALSE(milvus::server::ValidationUtil::ValidateStoragePath("/****tmp/milvus").ok());
ASSERT_FALSE(milvus::server::ValidationUtil::ValidateStoragePath("/tmp--/milvus").ok());
ASSERT_FALSE(milvus::server::ValidationUtil::ValidateStoragePath("./tmp/milvus").ok());
ASSERT_FALSE(milvus::server::ValidationUtil::ValidateStoragePath("/tmp space/milvus").ok());
ASSERT_FALSE(milvus::server::ValidationUtil::ValidateStoragePath("../tmp/milvus").ok());
ASSERT_FALSE(milvus::server::ValidationUtil::ValidateStoragePath("/tmp//milvus").ok());
}
TEST(UtilTest, ROLLOUTHANDLER_TEST) {
std::string dir1 = "/tmp/milvus_test";
std::string dir2 = "/tmp/milvus_test/log_test";

View File

@ -9,43 +9,40 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <gtest/gtest.h>
#include <opentracing/mocktracer/tracer.h>
#include <unistd.h>
#include <random>
#include <thread>
#include <boost/filesystem.hpp>
#include <thread>
#include <random>
#include <unistd.h>
#include <oatpp/web/client/HttpRequestExecutor.hpp>
#include <oatpp/network/client/SimpleTCPConnectionProvider.hpp>
#include <oatpp/core/macro/component.hpp>
#include <oatpp/web/client/ApiClient.hpp>
#include <gtest/gtest.h>
#include <oatpp-test/UnitTest.hpp>
#include "wrapper/VecIndex.h"
#include "server/Server.h"
#include "server/delivery/RequestScheduler.h"
#include "server/delivery/request/BaseRequest.h"
#include "server/delivery/RequestHandler.h"
#include "src/version.h"
#include "server/web_impl/handler/WebRequestHandler.h"
#include "server/web_impl/dto/TableDto.hpp"
#include "server/web_impl/dto/StatusDto.hpp"
#include "server/web_impl/dto/VectorDto.hpp"
#include "server/web_impl/dto/IndexDto.hpp"
#include "server/web_impl/component/AppComponent.hpp"
#include "server/web_impl/controller/WebController.hpp"
#include "server/web_impl/Types.h"
#include "server/web_impl/WebServer.h"
#include <oatpp/core/macro/component.hpp>
#include <oatpp/network/client/SimpleTCPConnectionProvider.hpp>
#include <oatpp/web/client/ApiClient.hpp>
#include <oatpp/web/client/HttpRequestExecutor.hpp>
#include <opentracing/mocktracer/tracer.h>
#include "scheduler/ResourceFactory.h"
#include "scheduler/SchedInst.h"
#include "server/Config.h"
#include "server/DBWrapper.h"
#include "server/delivery/RequestHandler.h"
#include "server/delivery/RequestScheduler.h"
#include "server/delivery/request/BaseRequest.h"
#include "server/Server.h"
#include "src/version.h"
#include "server/web_impl/Types.h"
#include "server/web_impl/WebServer.h"
#include "server/web_impl/component/AppComponent.hpp"
#include "server/web_impl/controller/WebController.hpp"
#include "server/web_impl/dto/IndexDto.hpp"
#include "server/web_impl/dto/StatusDto.hpp"
#include "server/web_impl/dto/TableDto.hpp"
#include "server/web_impl/dto/VectorDto.hpp"
#include "server/web_impl/handler/WebRequestHandler.h"
#include "utils/CommonUtil.h"
#include "wrapper/VecIndex.h"
#include "unittest/server/utils.h"
@ -63,7 +60,7 @@ using OChunkedBuffer = oatpp::data::stream::ChunkedBuffer;
using OOutputStream = oatpp::data::stream::BufferOutputStream;
using OFloat32 = milvus::server::web::OFloat32;
using OInt64 = milvus::server::web::OInt64;
template<class T>
template <class T>
using OList = milvus::server::web::OList<T>;
using StatusCode = milvus::server::web::StatusCode;
@ -389,6 +386,7 @@ TEST_F(WebHandlerTest, SEARCH) {
insert_request_dto->ids = insert_request_dto->ids->createShared();
auto ids_dto = milvus::server::web::VectorIdsDto::createShared();
auto status_dto = handler->Insert(table_name, insert_request_dto, ids_dto);
ASSERT_EQ(0, status_dto->code->getValue());
auto search_request_dto = milvus::server::web::SearchRequestDto::createShared();
search_request_dto->records = RandomRecordsDto(TABLE_DIM, 10);
@ -407,15 +405,14 @@ TEST_F(WebHandlerTest, CMD) {
auto cmd_dto = milvus::server::web::CommandDto::createShared();
cmd = "status";
OQueryParams query_params;
auto status_dto = handler->Cmd(cmd, query_params, cmd_dto);
auto status_dto = handler->SystemInfo(cmd, cmd_dto);
ASSERT_EQ(0, status_dto->code->getValue());
ASSERT_EQ("OK", cmd_dto->reply->std_str());
cmd = "version";
status_dto = handler->Cmd(cmd, query_params, cmd_dto);
status_dto = handler->SystemInfo(cmd, cmd_dto);
ASSERT_EQ(0, status_dto->code->getValue());
ASSERT_EQ("0.6.0", cmd_dto->reply->std_str());
ASSERT_EQ(MILVUS_VERSION, cmd_dto->reply->std_str());
}
///////////////////////////////////////////////////////////////////////////////////////
@ -424,47 +421,104 @@ namespace {
static const char* CONTROLLER_TEST_VALID_CONFIG_STR =
"# Default values are used when you make no changes to the following parameters.\n"
"\n"
"version: 0.1"
"version: 0.1\n"
"\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# Server Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"server_config:\n"
" address: 0.0.0.0 # milvus server ip address (IPv4)\n"
" port: 19530 # port range: 1025 ~ 65534\n"
" deploy_mode: single \n"
" address: 0.0.0.0\n"
" port: 19530\n"
" deploy_mode: single\n"
" time_zone: UTC+8\n"
" web_port: 19121\n"
"\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# DataBase Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"db_config:\n"
" backend_url: sqlite://:@:/ \n"
"\n"
" insert_buffer_size: 4 # GB, maximum insert buffer size allowed\n"
" preload_table: \n"
" backend_url: sqlite://:@:/\n"
" preload_table:\n"
"\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# Storage Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"storage_config:\n"
" primary_path: /tmp/milvus_web_controller_test # path used to store data and meta\n"
" secondary_path: # path used to store data only, split by semicolon\n"
" primary_path: /tmp/milvus\n"
" secondary_path:\n"
" s3_enable: false\n"
" s3_address: 127.0.0.1\n"
" s3_port: 9000\n"
" s3_access_key: minioadmin\n"
" s3_secret_key: minioadmin\n"
" s3_bucket: milvus-bucket\n"
"\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# Metric Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"metric_config:\n"
" enable_monitor: false # enable monitoring or not\n"
" enable_monitor: false\n"
" address: 127.0.0.1\n"
" port: 8080 # port prometheus uses to fetch metrics\n"
" port: 9091\n"
"\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# Cache Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"cache_config:\n"
" cpu_cache_capacity: 4 # GB, CPU memory used for cache\n"
" cpu_cache_threshold: 0.85 \n"
" cache_insert_data: false # whether to load inserted data into cache\n"
" cpu_cache_capacity: 4\n"
" insert_buffer_size: 1\n"
" cache_insert_data: false\n"
"\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# Engine Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"engine_config:\n"
" use_blas_threshold: 20 \n"
" use_blas_threshold: 1100\n"
#ifdef MILVUS_GPU_VERSION
" gpu_search_threshold: 1000\n"
"\n"
#ifdef MILVUS_GPU_VERSION
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# GPU Resource Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"gpu_resource_config:\n"
" enable: true # whether to enable GPU resources\n"
" cache_capacity: 4 # GB, size of GPU memory per card used for cache, must be a positive integer\n"
" search_resources: # define the GPU devices used for search computation, must be in format gpux\n"
" enable: true\n"
" cache_capacity: 1\n"
" search_resources:\n"
" - gpu0\n"
" build_index_resources: # define the GPU devices used for index building, must be in format gpux\n"
" build_index_resources:\n"
" - gpu0\n"
#endif
"\n";
#endif
"\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"# Tracing Config | Description | Type | Default "
" |\n"
"#----------------------+------------------------------------------------------------+------------+----------------"
"-+\n"
"tracing_config:\n"
" json_config_path:\n"
"";
static const char* CONTROLLER_TEST_TABLE_NAME = "controller_unit_test";
static const char* CONTROLLER_TEST_CONFIG_DIR = "/tmp/milvus_web_controller_test/";
@ -473,7 +527,7 @@ static const char* CONTROLLER_TEST_CONFIG_FILE = "config.yaml";
class TestClient : public oatpp::web::client::ApiClient {
public:
#include OATPP_CODEGEN_BEGIN(ApiClient)
API_CLIENT_INIT(TestClient)
API_CLIENT_INIT(TestClient)
API_CALL("GET", "/", root)
@ -513,8 +567,8 @@ class TestClient : public oatpp::web::client::ApiClient {
API_CALL("OPTIONS", "/tables/{table_name}/indexes", optionsIndexes, PATH(String, table_name, "table_name"))
API_CALL("POST", "/tables/{table_name}/indexes", createIndex,
PATH(String, table_name, "table_name"), BODY_DTO(milvus::server::web::IndexRequestDto::ObjectWrapper, body))
API_CALL("POST", "/tables/{table_name}/indexes", createIndex, PATH(String, table_name, "table_name"),
BODY_DTO(milvus::server::web::IndexRequestDto::ObjectWrapper, body))
API_CALL("GET", "/tables/{table_name}/indexes", getIndex, PATH(String, table_name, "table_name"))
@ -522,11 +576,11 @@ class TestClient : public oatpp::web::client::ApiClient {
API_CALL("OPTIONS", "/tables/{table_name}/partitions", optionsPartitions, PATH(String, table_name, "table_name"))
API_CALL("POST", "/tables/{table_name}/partitions", createPartition,
PATH(String, table_name, "table_name"), BODY_DTO(milvus::server::web::PartitionRequestDto::ObjectWrapper, body))
API_CALL("POST", "/tables/{table_name}/partitions", createPartition, PATH(String, table_name, "table_name"),
BODY_DTO(milvus::server::web::PartitionRequestDto::ObjectWrapper, body))
API_CALL("GET", "/tables/{table_name}/partitions", showPartitions,
PATH(String, table_name, "table_name"), QUERY(String, offset), QUERY(String, page_size))
API_CALL("GET", "/tables/{table_name}/partitions", showPartitions, PATH(String, table_name, "table_name"),
QUERY(String, offset), QUERY(String, page_size))
API_CALL("OPTIONS", "/tables/{table_name}/partitions/{partition_tag}", optionsParTag,
PATH(String, table_name, "table_name"), PATH(String, partition_tag, "partition_tag"))
@ -536,14 +590,16 @@ class TestClient : public oatpp::web::client::ApiClient {
API_CALL("OPTIONS", "/tables/{table_name}/vectors", optionsVectors, PATH(String, table_name, "table_name"))
API_CALL("POST", "/tables/{table_name}/vectors", insert,
PATH(String, table_name, "table_name"), BODY_DTO(milvus::server::web::InsertRequestDto::ObjectWrapper, body))
API_CALL("POST", "/tables/{table_name}/vectors", insert, PATH(String, table_name, "table_name"),
BODY_DTO(milvus::server::web::InsertRequestDto::ObjectWrapper, body))
API_CALL("PUT", "/tables/{table_name}/vectors", search,
PATH(String, table_name, "table_name"), BODY_DTO(milvus::server::web::SearchRequestDto::ObjectWrapper, body))
API_CALL("PUT", "/tables/{table_name}/vectors", search, PATH(String, table_name, "table_name"),
BODY_DTO(milvus::server::web::SearchRequestDto::ObjectWrapper, body))
API_CALL("GET", "/system/{msg}", cmd, PATH(String, cmd_str, "msg"), QUERY(String, action), QUERY(String, target))
API_CALL("PUT", "/system/{op}", exec, PATH(String, op, "op"), BODY_STRING(String, body))
#include OATPP_CODEGEN_END(ApiClient)
};
@ -551,10 +607,12 @@ class WebControllerTest : public testing::Test {
protected:
static void
SetUpTestCase() {
mkdir(CONTROLLER_TEST_CONFIG_DIR, S_IRWXU);
// Basic config
std::string config_path = std::string(CONTROLLER_TEST_CONFIG_DIR).append(CONTROLLER_TEST_CONFIG_FILE);
std::fstream fs(config_path.c_str(), std::ios_base::out);
fs << CONTROLLER_TEST_VALID_CONFIG_STR;
fs.flush();
fs.close();
milvus::server::Config& config = milvus::server::Config::GetInstance();
@ -616,6 +674,14 @@ class WebControllerTest : public testing::Test {
void
SetUp() override {
std::string config_path = std::string(CONTROLLER_TEST_CONFIG_DIR).append(CONTROLLER_TEST_CONFIG_FILE);
std::fstream fs(config_path.c_str(), std::ios_base::out);
fs << CONTROLLER_TEST_VALID_CONFIG_STR;
fs.close();
milvus::server::Config& config = milvus::server::Config::GetInstance();
config.LoadConfigFile(std::string(CONTROLLER_TEST_CONFIG_DIR) + CONTROLLER_TEST_CONFIG_FILE);
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
OATPP_COMPONENT(std::shared_ptr<oatpp::data::mapping::ObjectMapper>, objectMapper);
object_mapper = objectMapper;
@ -627,8 +693,7 @@ class WebControllerTest : public testing::Test {
}
void
TearDown() override {
};
TearDown() override{};
protected:
std::shared_ptr<oatpp::data::mapping::ObjectMapper> object_mapper;
@ -636,7 +701,8 @@ class WebControllerTest : public testing::Test {
std::shared_ptr<TestClient> client_ptr;
protected:
void GenTable(const std::string& table_name, int64_t dim, int64_t index_file_size, int64_t metric_type) {
void
GenTable(const std::string& table_name, int64_t dim, int64_t index_file_size, int64_t metric_type) {
auto table_dto = milvus::server::web::TableRequestDto::createShared();
table_dto->table_name = OString(table_name.c_str());
table_dto->dimension = dim;
@ -647,7 +713,7 @@ class WebControllerTest : public testing::Test {
}
};
} // namespace
} // namespace
TEST_F(WebControllerTest, OPTIONS) {
auto response = client_ptr->root(conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
@ -748,7 +814,7 @@ TEST_F(WebControllerTest, SHOW_TABLES) {
auto response = client_ptr->showTables("1", "1", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
auto result_dto = response->readBodyToDto<milvus::server::web::TableListFieldsDto>(object_mapper.get());
ASSERT_TRUE(result_dto->count->getValue() >= 0);
ASSERT_GE(result_dto->count->getValue(), 0);
// test query table empty
response = client_ptr->showTables("0", "0", conncetion_ptr);
@ -847,6 +913,35 @@ TEST_F(WebControllerTest, INSERT_IDS) {
ASSERT_EQ(OStatus::CODE_204.code, response->getStatusCode());
}
TEST_F(WebControllerTest, LOAD_TABLE) {
milvus::server::Config& config = milvus::server::Config::GetInstance();
auto status = config.SetCacheConfigInsertBufferSize("1");
ASSERT_TRUE(status.ok());
const OString table_name = "test_web_controller_table_load_test" + OString(RandomName().c_str());
GenTable(table_name, 64, 100, "L2");
// Insert 200 vectors into table
auto insert_dto = milvus::server::web::InsertRequestDto::createShared();
insert_dto->ids = insert_dto->ids->createShared();
insert_dto->records = RandomRecordsDto(64, 100);
auto response = client_ptr->insert(table_name, insert_dto, conncetion_ptr);
ASSERT_EQ(OStatus::CODE_201.code, response->getStatusCode());
auto result_dto = response->readBodyToDto<milvus::server::web::VectorIdsDto>(object_mapper.get());
ASSERT_EQ(100, result_dto->ids->count());
sleep(2);
std::string request_str = "{\"load\": {\"table_name\": \"" + table_name->std_str() + "\"}}";
response = client_ptr->exec("task", request_str.c_str());
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode()) << response->readBodyToString()->c_str();
// test with non-exist table
request_str = "{\"load\": {\"table_name\": \"OOOO124214\"}}";
response = client_ptr->exec("task", request_str.c_str());
ASSERT_NE(OStatus::CODE_200.code, response->getStatusCode());
}
TEST_F(WebControllerTest, INDEX) {
auto table_name = "test_insert_table_test" + OString(RandomName().c_str());
GenTable(table_name, 64, 100, "L2");
@ -921,6 +1016,10 @@ TEST_F(WebControllerTest, INDEX) {
ASSERT_EQ(OStatus::CODE_404.code, response->getStatusCode());
auto error_dto = response->readBodyToDto<milvus::server::web::StatusDto>(object_mapper.get());
ASSERT_EQ(milvus::server::web::StatusCode::TABLE_NOT_EXISTS, error_dto->code->getValue());
// drop index which table is non-existent
response = client_ptr->dropIndex("Table_name_non_existant_000000000000000000", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_404.code, response->getStatusCode()) << response->readBodyToString()->c_str();
}
TEST_F(WebControllerTest, PARTITION) {
@ -1002,7 +1101,7 @@ TEST_F(WebControllerTest, SEARCH) {
OQueryParams query_params;
auto insert_dto = milvus::server::web::InsertRequestDto::createShared();
insert_dto->ids = insert_dto->ids->createShared();
insert_dto->records = RandomRecordsDto(64, 200);// insert_dto->records->createShared();
insert_dto->records = RandomRecordsDto(64, 200); // insert_dto->records->createShared();
auto response = client_ptr->insert(table_name, insert_dto, conncetion_ptr);
ASSERT_EQ(OStatus::CODE_201.code, response->getStatusCode());
@ -1011,13 +1110,13 @@ TEST_F(WebControllerTest, SEARCH) {
sleep(4);
//Create partition and insert 200 vectors into it
// Create partition and insert 200 vectors into it
auto par_param = milvus::server::web::PartitionRequestDto::createShared();
par_param->partition_name = "partition" + OString(RandomName().c_str());
par_param->partition_tag = "tag" + OString(RandomName().c_str());
response = client_ptr->createPartition(table_name, par_param);
ASSERT_EQ(OStatus::CODE_201.code, response->getStatusCode())
<< "Error: " << response->getStatusDescription()->std_str();
<< "Error: " << response->getStatusDescription()->std_str();
insert_dto->tag = par_param->partition_tag;
response = client_ptr->insert(table_name, insert_dto, conncetion_ptr);
@ -1076,13 +1175,13 @@ TEST_F(WebControllerTest, SEARCH_BIN) {
sleep(4);
//Create partition and insert 200 vectors into it
// Create partition and insert 200 vectors into it
auto par_param = milvus::server::web::PartitionRequestDto::createShared();
par_param->partition_name = "partition" + OString(RandomName().c_str());
par_param->partition_tag = "tag" + OString(RandomName().c_str());
response = client_ptr->createPartition(table_name, par_param);
ASSERT_EQ(OStatus::CODE_201.code, response->getStatusCode())
<< "Error: " << response->getStatusDescription()->std_str();
<< "Error: " << response->getStatusDescription()->std_str();
insert_dto->tag = par_param->partition_tag;
response = client_ptr->insert(table_name, insert_dto, conncetion_ptr);
@ -1132,15 +1231,61 @@ TEST_F(WebControllerTest, CMD) {
response = client_ptr->cmd("info", "", "", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
GenTable("test_cmd", 16, 10, "L2");
response = client_ptr->cmd("task", "load", "test_cmd", conncetion_ptr);
response = client_ptr->cmd("info", "", "", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
// task without existing table
response = client_ptr->cmd("task", "load", "test_cmdXXXXXXXXXXXX", conncetion_ptr);
response = client_ptr->cmd("config", "", "", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode()) << response->readBodyToString()->c_str();
}
TEST_F(WebControllerTest, SYSTEM_OP_TEST) {
std::string config_path = std::string(CONTROLLER_TEST_CONFIG_DIR).append(CONTROLLER_TEST_CONFIG_FILE);
std::fstream fs(config_path.c_str(), std::ios_base::out);
fs << CONTROLLER_TEST_VALID_CONFIG_STR;
fs.flush();
milvus::server::Config& config = milvus::server::Config::GetInstance();
auto status = config.LoadConfigFile(config_path);
ASSERT_TRUE(status.ok()) << status.message();
/* test task load */
auto response =
client_ptr->exec("task", "{\"load\": {\"table_name\": \"milvus_non_existent_table_test\"}}", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_400.code, response->getStatusCode());
/* test flush */
response =
client_ptr->exec("task", "{\"flush\": {\"table_names\": \"milvus_non_existent_table_test\"}}", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_400.code, response->getStatusCode());
response = client_ptr->exec(
"task", "{\"flush\": {\"table_names\": \"[milvus_non_existent_table_test_for_flush]\"}}", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_400.code, response->getStatusCode());
response = client_ptr->exec("config", "{\"cache_config\": {\"cpu_cache_capacity\": 1}}", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
response = client_ptr->exec("config", "{\"cache_config\": {\"cpu_cache_capacity\": 10000}}", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_400.code, response->getStatusCode());
// test invalid body
response = client_ptr->exec("config", "{1}}", conncetion_ptr);
ASSERT_EQ(OStatus::CODE_400.code, response->getStatusCode());
fs.close();
}
TEST_F(WebControllerTest, ADVANCED_CONFIG) {
std::string config_path = std::string(CONTROLLER_TEST_CONFIG_DIR).append(CONTROLLER_TEST_CONFIG_FILE);
std::fstream fs(config_path.c_str(), std::ios_base::out);
fs << CONTROLLER_TEST_VALID_CONFIG_STR;
fs.flush();
fs.close();
milvus::server::Config& config = milvus::server::Config::GetInstance();
auto status = config.LoadConfigFile(config_path);
ASSERT_TRUE(status.ok()) << status.message();
auto response = client_ptr->getAdvanced(conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
@ -1178,6 +1323,25 @@ TEST_F(WebControllerTest, ADVANCED_CONFIG) {
#ifdef MILVUS_GPU_VERSION
TEST_F(WebControllerTest, GPU_CONFIG) {
std::string config_path = std::string(CONTROLLER_TEST_CONFIG_DIR).append(CONTROLLER_TEST_CONFIG_FILE);
std::fstream fs(config_path.c_str(), std::ios_base::out);
fs << CONTROLLER_TEST_VALID_CONFIG_STR;
fs.flush();
fs.close();
milvus::server::Config& config = milvus::server::Config::GetInstance();
auto status = config.LoadConfigFile(config_path);
ASSERT_TRUE(status.ok()) << status.message();
status = config.SetGpuResourceConfigEnable("true");
ASSERT_TRUE(status.ok()) << status.message();
status = config.SetGpuResourceConfigCacheCapacity("1");
ASSERT_TRUE(status.ok()) << status.message();
status = config.SetGpuResourceConfigBuildIndexResources("gpu0");
ASSERT_TRUE(status.ok()) << status.message();
status = config.SetGpuResourceConfigSearchResources("gpu0");
ASSERT_TRUE(status.ok()) << status.message();
auto response = client_ptr->getGPUConfig(conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
@ -1227,4 +1391,3 @@ TEST_F(WebControllerTest, DEVICES_CONFIG) {
auto response = WebControllerTest::client_ptr->getDevices(conncetion_ptr);
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
}