fix conflicts

Former-commit-id: 421fa7db0f800aec6fef28d55a6dfb80e9308759
pull/191/head
Yu Kun 2019-10-13 21:18:02 +08:00
commit b429953c0b
33 changed files with 736 additions and 251 deletions

View File

@ -2,10 +2,9 @@ timeout(time: 30, unit: 'MINUTES') {
try { try {
dir ("${PROJECT_NAME}_test") { dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements.txt' sh 'python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com'
sh "pytest . --alluredir=\"test_out/dev/single/sqlite\" --level=1 --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local" sh "pytest . --alluredir=\"test_out/dev/single/sqlite\" --level=1 --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local"
} }
// mysql database backend test // mysql database backend test
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy" load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"

View File

@ -2,7 +2,7 @@ timeout(time: 60, unit: 'MINUTES') {
try { try {
dir ("${PROJECT_NAME}_test") { dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements.txt' sh 'python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com'
sh "pytest . --alluredir=\"test_out/dev/single/sqlite\" --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local" sh "pytest . --alluredir=\"test_out/dev/single/sqlite\" --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local"
} }

View File

@ -37,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-619 - Add optimizer class in scheduler - MS-619 - Add optimizer class in scheduler
- MS-614 - Preload table at startup - MS-614 - Preload table at startup
- MS-626 - Refactor DataObj to support cache any type data - MS-626 - Refactor DataObj to support cache any type data
- MS-648 - Improve unittest
## New Feature ## New Feature
- MS-627 - Integrate new index: IVFSQHybrid - MS-627 - Integrate new index: IVFSQHybrid

View File

@ -143,7 +143,7 @@ if(USE_JFROG_CACHE STREQUAL "ON")
if(NOT DEFINED JFROG_ARTFACTORY_URL) if(NOT DEFINED JFROG_ARTFACTORY_URL)
message(FATAL_ERROR "JFROG_ARTFACTORY_URL is not set") message(FATAL_ERROR "JFROG_ARTFACTORY_URL is not set")
endif() endif()
set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/generic-local/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${MILVUS_BUILD_ARCH}/${BUILD_TYPE}") set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${MILVUS_BUILD_ARCH}/${BUILD_TYPE}")
if(DEFINED ENV{JFROG_USER_NAME}) if(DEFINED ENV{JFROG_USER_NAME})
set(JFROG_USER_NAME "$ENV{JFROG_USER_NAME}") set(JFROG_USER_NAME "$ENV{JFROG_USER_NAME}")
endif() endif()

View File

@ -123,7 +123,7 @@ if(NOT DEFINED USE_JFROG_CACHE)
set(USE_JFROG_CACHE "OFF") set(USE_JFROG_CACHE "OFF")
endif() endif()
if(USE_JFROG_CACHE STREQUAL "ON") if(USE_JFROG_CACHE STREQUAL "ON")
set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/generic-local/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${KNOWHERE_BUILD_ARCH}/${BUILD_TYPE}") set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${KNOWHERE_BUILD_ARCH}/${BUILD_TYPE}")
set(THIRDPARTY_PACKAGE_CACHE "${THIRDPARTY_DIR}/cache") set(THIRDPARTY_PACKAGE_CACHE "${THIRDPARTY_DIR}/cache")
endif() endif()
@ -234,7 +234,7 @@ if(CUSTOMIZATION)
# set(FAISS_MD5 "57da9c4f599cc8fa4260488b1c96e1cc") # commit-id 6dbdf75987c34a2c853bd172ea0d384feea8358c branch-0.2.0 # set(FAISS_MD5 "57da9c4f599cc8fa4260488b1c96e1cc") # commit-id 6dbdf75987c34a2c853bd172ea0d384feea8358c branch-0.2.0
# set(FAISS_MD5 "21deb1c708490ca40ecb899122c01403") # commit-id 643e48f479637fd947e7b93fa4ca72b38ecc9a39 branch-0.2.0 # set(FAISS_MD5 "21deb1c708490ca40ecb899122c01403") # commit-id 643e48f479637fd947e7b93fa4ca72b38ecc9a39 branch-0.2.0
# set(FAISS_MD5 "072db398351cca6e88f52d743bbb9fa0") # commit-id 3a2344d04744166af41ef1a74449d68a315bfe17 branch-0.2.1 # set(FAISS_MD5 "072db398351cca6e88f52d743bbb9fa0") # commit-id 3a2344d04744166af41ef1a74449d68a315bfe17 branch-0.2.1
set(FAISS_MD5 "94988b7bdac4eb82a9575c702a3f2df3") # commit-id 1407526b31cad26f98ceca8dddaface8f18c4c19 branch-0.2.1 set(FAISS_MD5 "c89ea8e655f5cdf58f42486f13614714") # commit-id 9c28a1cbb88f41fa03b03d7204106201ad33276b branch-0.2.1
execute_process(COMMAND wget -q --method HEAD ${FAISS_SOURCE_URL} RESULT_VARIABLE return_code) execute_process(COMMAND wget -q --method HEAD ${FAISS_SOURCE_URL} RESULT_VARIABLE return_code)
message(STATUS "Check the remote cache file ${FAISS_SOURCE_URL}. return code = ${return_code}") message(STATUS "Check the remote cache file ${FAISS_SOURCE_URL}. return code = ${return_code}")

View File

@ -71,4 +71,13 @@ GPUIVFSQ::CopyGpuToCpu(const Config& config) {
return std::make_shared<IVFSQ>(new_index); return std::make_shared<IVFSQ>(new_index);
} }
void
GPUIVFSQ::search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) {
#ifdef CUSTOMIZATION
GPUIVF::search_impl(n, data, k, distances, labels, cfg);
#else
IVF::search_impl(n, data, k, distances, labels, cfg);
#endif
}
} // namespace knowhere } // namespace knowhere

View File

@ -38,6 +38,10 @@ class GPUIVFSQ : public GPUIVF {
VectorIndexPtr VectorIndexPtr
CopyGpuToCpu(const Config& config) override; CopyGpuToCpu(const Config& config) override;
protected:
void
search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) override;
}; };
} // namespace knowhere } // namespace knowhere

View File

@ -115,6 +115,19 @@ IVF::Search(const DatasetPtr& dataset, const Config& config) {
search_impl(rows, (float*)p_data, search_cfg->k, res_dis, res_ids, config); search_impl(rows, (float*)p_data, search_cfg->k, res_dis, res_ids, config);
// std::stringstream ss_res_id, ss_res_dist;
// for (int i = 0; i < 10; ++i) {
// printf("%llu", res_ids[i]);
// printf("\n");
// printf("%.6f", res_dis[i]);
// printf("\n");
// ss_res_id << res_ids[i] << " ";
// ss_res_dist << res_dis[i] << " ";
// }
// std::cout << std::endl << "after search: " << std::endl;
// std::cout << ss_res_id.str() << std::endl;
// std::cout << ss_res_dist.str() << std::endl << std::endl;
auto id_buf = MakeMutableBufferSmart((uint8_t*)res_ids, sizeof(int64_t) * elems); auto id_buf = MakeMutableBufferSmart((uint8_t*)res_ids, sizeof(int64_t) * elems);
auto dist_buf = MakeMutableBufferSmart((uint8_t*)res_dis, sizeof(float) * elems); auto dist_buf = MakeMutableBufferSmart((uint8_t*)res_dis, sizeof(float) * elems);

View File

@ -17,6 +17,7 @@
// under the License. // under the License.
#include "knowhere/index/vector_index/IndexIVFSQHybrid.h" #include "knowhere/index/vector_index/IndexIVFSQHybrid.h"
#include <utility>
#include "faiss/AutoTune.h" #include "faiss/AutoTune.h"
#include "faiss/gpu/GpuAutoTune.h" #include "faiss/gpu/GpuAutoTune.h"
#include "faiss/gpu/GpuIndexIVF.h" #include "faiss/gpu/GpuIndexIVF.h"
@ -79,20 +80,8 @@ IVFSQHybrid::CopyGpuToCpu(const Config& config) {
VectorIndexPtr VectorIndexPtr
IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) { IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) { if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) {
ResScope rs(res, device_id, false); auto p = CopyCpuToGpuWithQuantizer(device_id, config);
faiss::gpu::GpuClonerOptions option; return p.first;
option.allInGpu = true;
faiss::IndexComposition index_composition;
index_composition.index = index_.get();
index_composition.quantizer = nullptr;
index_composition.mode = 0; // copy all
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option);
std::shared_ptr<faiss::Index> device_index;
device_index.reset(gpu_index);
return std::make_shared<IVFSQHybrid>(device_index, device_id, res);
} else { } else {
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource"); KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
} }
@ -180,7 +169,7 @@ IVFSQHybrid::UnsetQuantizer() {
ivf_index->quantizer = nullptr; ivf_index->quantizer = nullptr;
} }
void VectorIndexPtr
IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
auto quantizer_conf = std::dynamic_pointer_cast<QuantizerCfg>(conf); auto quantizer_conf = std::dynamic_pointer_cast<QuantizerCfg>(conf);
if (quantizer_conf != nullptr) { if (quantizer_conf != nullptr) {
@ -188,9 +177,10 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
KNOWHERE_THROW_MSG("mode only support 2 in this func"); KNOWHERE_THROW_MSG("mode only support 2 in this func");
} }
} }
if (quantizer_conf->gpu_id != gpu_id_) { // if (quantizer_conf->gpu_id != gpu_id_) {
KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card"); // KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card");
} // }
gpu_id_ = quantizer_conf->gpu_id;
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) { if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
ResScope rs(res, gpu_id_, false); ResScope rs(res, gpu_id_, false);
@ -207,8 +197,37 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
index_composition->mode = quantizer_conf->mode; // only 2 index_composition->mode = quantizer_conf->mode; // only 2
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index_composition, &option); auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index_composition, &option);
index_.reset(gpu_index); std::shared_ptr<faiss::Index> new_idx;
gpu_mode = 2; // all in gpu new_idx.reset(gpu_index);
auto sq_idx = std::make_shared<IVFSQHybrid>(new_idx, gpu_id_, res);
return sq_idx;
} else {
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
}
}
std::pair<VectorIndexPtr, QuantizerPtr>
IVFSQHybrid::CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) {
ResScope rs(res, device_id, false);
faiss::gpu::GpuClonerOptions option;
option.allInGpu = true;
faiss::IndexComposition index_composition;
index_composition.index = index_.get();
index_composition.quantizer = nullptr;
index_composition.mode = 0; // copy all
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option);
std::shared_ptr<faiss::Index> device_index;
device_index.reset(gpu_index);
auto new_idx = std::make_shared<IVFSQHybrid>(device_index, device_id, res);
auto q = std::make_shared<FaissIVFQuantizer>();
q->quantizer = index_composition.quantizer;
q->size = index_composition.quantizer->d * index_composition.quantizer->getNumVecs() * sizeof(float);
return std::make_pair(new_idx, q);
} else { } else {
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource"); KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
} }

View File

@ -19,6 +19,7 @@
#include <faiss/index_io.h> #include <faiss/index_io.h>
#include <memory> #include <memory>
#include <utility>
#include "IndexGPUIVFSQ.h" #include "IndexGPUIVFSQ.h"
#include "Quantizer.h" #include "Quantizer.h"
@ -60,9 +61,12 @@ class IVFSQHybrid : public GPUIVFSQ {
void void
UnsetQuantizer(); UnsetQuantizer();
void VectorIndexPtr
LoadData(const knowhere::QuantizerPtr& q, const Config& conf); LoadData(const knowhere::QuantizerPtr& q, const Config& conf);
std::pair<VectorIndexPtr, QuantizerPtr>
CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& config);
IndexModelPtr IndexModelPtr
Train(const DatasetPtr& dataset, const Config& config) override; Train(const DatasetPtr& dataset, const Config& config) override;

View File

@ -154,8 +154,8 @@ class IVFTest : public DataGen, public TestWithParam<::std::tuple<std::string, P
INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest, INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
Values(std::make_tuple("IVF", ParameterType::ivf), Values(std::make_tuple("IVF", ParameterType::ivf),
std::make_tuple("GPUIVF", ParameterType::ivf), std::make_tuple("GPUIVF", ParameterType::ivf),
// std::make_tuple("IVFPQ", ParameterType::ivfpq), std::make_tuple("IVFPQ", ParameterType::ivfpq),
// std::make_tuple("GPUIVFPQ", ParameterType::ivfpq), std::make_tuple("GPUIVFPQ", ParameterType::ivfpq),
std::make_tuple("IVFSQ", ParameterType::ivfsq), std::make_tuple("IVFSQ", ParameterType::ivfsq),
#ifdef CUSTOMIZATION #ifdef CUSTOMIZATION
std::make_tuple("IVFSQHybrid", ParameterType::ivfsq), std::make_tuple("IVFSQHybrid", ParameterType::ivfsq),
@ -240,6 +240,7 @@ TEST_P(IVFTest, hybrid) {
auto result = hybrid_1_idx->Search(query_dataset, conf); auto result = hybrid_1_idx->Search(query_dataset, conf);
AssertAnns(result, nq, conf->k); AssertAnns(result, nq, conf->k);
PrintResult(result, nq, k); PrintResult(result, nq, k);
hybrid_1_idx->UnsetQuantizer();
} }
{ {
@ -253,9 +254,9 @@ TEST_P(IVFTest, hybrid) {
quantizer_conf->gpu_id = device_id; quantizer_conf->gpu_id = device_id;
auto q = hybrid_2_idx->LoadQuantizer(quantizer_conf); auto q = hybrid_2_idx->LoadQuantizer(quantizer_conf);
quantizer_conf->mode = 2; quantizer_conf->mode = 2;
hybrid_2_idx->LoadData(q, quantizer_conf); auto gpu_idx = hybrid_2_idx->LoadData(q, quantizer_conf);
auto result = hybrid_2_idx->Search(query_dataset, conf); auto result = gpu_idx->Search(query_dataset, conf);
AssertAnns(result, nq, conf->k); AssertAnns(result, nq, conf->k);
PrintResult(result, nq, k); PrintResult(result, nq, k);
} }
@ -438,6 +439,7 @@ TEST_P(IVFTest, clone_test) {
} }
} }
#ifdef CUSTOMIZATION
TEST_P(IVFTest, seal_test) { TEST_P(IVFTest, seal_test) {
// FaissGpuResourceMgr::GetInstance().InitDevice(device_id); // FaissGpuResourceMgr::GetInstance().InitDevice(device_id);
@ -472,6 +474,7 @@ TEST_P(IVFTest, seal_test) {
auto with_seal = tc.RecordSection("With seal"); auto with_seal = tc.RecordSection("With seal");
ASSERT_GE(without_seal, with_seal); ASSERT_GE(without_seal, with_seal);
} }
#endif
class GPURESTEST : public DataGen, public ::testing::Test { class GPURESTEST : public DataGen, public ::testing::Test {
protected: protected:
@ -637,7 +640,7 @@ TEST_F(GPURESTEST, copyandsearch) {
// search and copy at the same time // search and copy at the same time
printf("==================\n"); printf("==================\n");
index_type = "GPUIVFSQ"; index_type = "GPUIVF";
index_ = IndexFactory(index_type); index_ = IndexFactory(index_type);
auto conf = std::make_shared<knowhere::IVFSQCfg>(); auto conf = std::make_shared<knowhere::IVFSQCfg>();
@ -699,7 +702,7 @@ TEST_F(GPURESTEST, copyandsearch) {
} }
TEST_F(GPURESTEST, TrainAndSearch) { TEST_F(GPURESTEST, TrainAndSearch) {
index_type = "GPUIVFSQ"; index_type = "GPUIVF";
index_ = IndexFactory(index_type); index_ = IndexFactory(index_type);
auto conf = std::make_shared<knowhere::IVFSQCfg>(); auto conf = std::make_shared<knowhere::IVFSQCfg>();

View File

@ -36,6 +36,7 @@ class KDTTest : public DataGen, public ::testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
Generate(96, 1000, 10);
index_ = std::make_shared<knowhere::CPUKDTRNG>(); index_ = std::make_shared<knowhere::CPUKDTRNG>();
auto tempconf = std::make_shared<knowhere::KDTCfg>(); auto tempconf = std::make_shared<knowhere::KDTCfg>();

View File

@ -38,17 +38,17 @@ class NSGInterfaceTest : public DataGen, public ::testing::Test {
SetUp() override { SetUp() override {
// Init_with_default(); // Init_with_default();
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024 * 1024 * 200, 1024 * 1024 * 600, 2); knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024 * 1024 * 200, 1024 * 1024 * 600, 2);
Generate(256, 1000000, 1); Generate(256, 1000000 / 100, 1);
index_ = std::make_shared<knowhere::NSG>(); index_ = std::make_shared<knowhere::NSG>();
auto tmp_conf = std::make_shared<knowhere::NSGCfg>(); auto tmp_conf = std::make_shared<knowhere::NSGCfg>();
tmp_conf->gpu_id = DEVICE_ID; tmp_conf->gpu_id = DEVICE_ID;
tmp_conf->knng = 100; tmp_conf->knng = 20;
tmp_conf->nprobe = 32; tmp_conf->nprobe = 8;
tmp_conf->nlist = 16384; tmp_conf->nlist = 163;
tmp_conf->search_length = 60; tmp_conf->search_length = 40;
tmp_conf->out_degree = 70; tmp_conf->out_degree = 30;
tmp_conf->candidate_pool_size = 500; tmp_conf->candidate_pool_size = 100;
tmp_conf->metric_type = knowhere::METRICTYPE::L2; tmp_conf->metric_type = knowhere::METRICTYPE::L2;
train_conf = tmp_conf; train_conf = tmp_conf;

View File

@ -65,7 +65,7 @@ class ExecutionEngine {
Load(bool to_cache = true) = 0; Load(bool to_cache = true) = 0;
virtual Status virtual Status
CopyToGpu(uint64_t device_id) = 0; CopyToGpu(uint64_t device_id, bool hybrid) = 0;
virtual Status virtual Status
CopyToIndexFileToGpu(uint64_t device_id) = 0; CopyToIndexFileToGpu(uint64_t device_id) = 0;
@ -80,7 +80,8 @@ class ExecutionEngine {
Merge(const std::string& location) = 0; Merge(const std::string& location) = 0;
virtual Status virtual Status
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const = 0; Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
bool hybrid) const = 0;
virtual std::shared_ptr<ExecutionEngine> virtual std::shared_ptr<ExecutionEngine>
BuildIndex(const std::string& location, EngineType engine_type) = 0; BuildIndex(const std::string& location, EngineType engine_type) = 0;

View File

@ -31,6 +31,7 @@
#include "wrapper/ConfAdapter.h" #include "wrapper/ConfAdapter.h"
#include "wrapper/ConfAdapterMgr.h" #include "wrapper/ConfAdapterMgr.h"
#include <src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h>
#include <src/scheduler/Utils.h> #include <src/scheduler/Utils.h>
#include <stdexcept> #include <stdexcept>
#include <utility> #include <utility>
@ -245,7 +246,31 @@ ExecutionEngineImpl::Load(bool to_cache) {
} }
Status Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
if (hybrid) {
auto key = location_ + ".quantizer";
auto quantizer =
std::static_pointer_cast<CachedQuantizer>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(key));
auto conf = std::make_shared<knowhere::QuantizerCfg>();
conf->gpu_id = device_id;
if (quantizer) {
// cache hit
conf->mode = 2;
auto new_index = index_->LoadData(quantizer->Data(), conf);
index_ = new_index;
} else {
auto pair = index_->CopyToGpuWithQuantizer(device_id);
index_ = pair.first;
// cache
auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
}
return Status::OK();
}
auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_)); auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
bool already_in_cache = (index != nullptr); bool already_in_cache = (index != nullptr);
if (already_in_cache) { if (already_in_cache) {
@ -389,8 +414,8 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
} }
Status Status
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
int64_t* labels) const { bool hybrid) const {
if (index_ == nullptr) { if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
return Status(DB_ERROR, "index is null"); return Status(DB_ERROR, "index is null");
@ -406,11 +431,15 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr
auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType()); auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
auto conf = adapter->MatchSearch(temp_conf, index_->GetType()); auto conf = adapter->MatchSearch(temp_conf, index_->GetType());
HybridLoad(); if (hybrid) {
HybridLoad();
}
auto status = index_->Search(n, data, distances, labels, conf); auto status = index_->Search(n, data, distances, labels, conf);
HybridUnset(); if (hybrid) {
HybridUnset();
}
if (!status.ok()) { if (!status.ok()) {
ENGINE_LOG_ERROR << "Search error"; ENGINE_LOG_ERROR << "Search error";

View File

@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
Load(bool to_cache) override; Load(bool to_cache) override;
Status Status
CopyToGpu(uint64_t device_id) override; CopyToGpu(uint64_t device_id, bool hybrid = false) override;
Status Status
CopyToIndexFileToGpu(uint64_t device_id) override; CopyToIndexFileToGpu(uint64_t device_id) override;
@ -71,7 +71,8 @@ class ExecutionEngineImpl : public ExecutionEngine {
Merge(const std::string& location) override; Merge(const std::string& location) override;
Status Status
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const override; Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
bool hybrid = false) const override;
ExecutionEnginePtr ExecutionEnginePtr
BuildIndex(const std::string& location, EngineType engine_type) override; BuildIndex(const std::string& location, EngineType engine_type) override;

View File

@ -19,9 +19,11 @@
#include "SchedInst.h" #include "SchedInst.h"
#include "TaskCreator.h" #include "TaskCreator.h"
#include "optimizer/Optimizer.h" #include "optimizer/Optimizer.h"
#include "scheduler/Algorithm.h"
#include "scheduler/optimizer/Optimizer.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "task/Task.h" #include "task/Task.h"
#include <src/scheduler/optimizer/Optimizer.h>
#include <utility> #include <utility>
namespace milvus { namespace milvus {
@ -73,6 +75,10 @@ JobMgr::worker_function() {
OptimizerInst::GetInstance()->Run(task); OptimizerInst::GetInstance()->Run(task);
} }
for (auto& task : tasks) {
calculate_path(task);
}
// disk resources NEVER be empty. // disk resources NEVER be empty.
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
for (auto& task : tasks) { for (auto& task : tasks) {
@ -87,5 +93,23 @@ JobMgr::build_task(const JobPtr& job) {
return TaskCreator::Create(job); return TaskCreator::Create(job);
} }
void
JobMgr::calculate_path(const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask) {
return;
}
if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) {
return;
}
std::vector<std::string> path;
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto dest = spec_label->resource();
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
task->path() = Path(path, path.size() - 1);
}
} // namespace scheduler } // namespace scheduler
} // namespace milvus } // namespace milvus

View File

@ -52,9 +52,12 @@ class JobMgr {
void void
worker_function(); worker_function();
std::vector<TaskPtr> static std::vector<TaskPtr>
build_task(const JobPtr& job); build_task(const JobPtr& job);
void
calculate_path(const TaskPtr& task);
private: private:
bool running_ = false; bool running_ = false;
std::queue<JobPtr> queue_; std::queue<JobPtr> queue_;

View File

@ -22,6 +22,7 @@
#include "ResourceMgr.h" #include "ResourceMgr.h"
#include "Scheduler.h" #include "Scheduler.h"
#include "optimizer/HybridPass.h" #include "optimizer/HybridPass.h"
#include "optimizer/LargeSQ8HPass.h"
#include "optimizer/Optimizer.h" #include "optimizer/Optimizer.h"
#include <memory> #include <memory>
@ -92,9 +93,9 @@ class OptimizerInst {
if (instance == nullptr) { if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) { if (instance == nullptr) {
HybridPassPtr pass_ptr = std::make_shared<HybridPass>();
std::vector<PassPtr> pass_list; std::vector<PassPtr> pass_list;
pass_list.push_back(pass_ptr); pass_list.push_back(std::make_shared<LargeSQ8HPass>());
pass_list.push_back(std::make_shared<HybridPass>());
instance = std::make_shared<Optimizer>(pass_list); instance = std::make_shared<Optimizer>(pass_list);
} }
} }

View File

@ -145,37 +145,39 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
transport_costs.push_back(transport_cost); transport_costs.push_back(transport_cost);
paths.emplace_back(path); paths.emplace_back(path);
} }
if (task->job_.lock()->type() == JobType::SEARCH) { // if (task->job_.lock()->type() == JobType::SEARCH) {
auto label = task->label(); // auto label = task->label();
auto spec_label = std::static_pointer_cast<SpecResLabel>(label); // auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
if (spec_label->resource().lock()->type() == ResourceType::CPU) { // if (spec_label->resource().lock()->type() == ResourceType::CPU) {
std::vector<std::string> spec_path; // std::vector<std::string> spec_path;
spec_path.push_back(spec_label->resource().lock()->name()); // spec_path.push_back(spec_label->resource().lock()->name());
spec_path.push_back(resource->name()); // spec_path.push_back(resource->name());
task->path() = Path(spec_path, spec_path.size() - 1); // task->path() = Path(spec_path, spec_path.size() - 1);
} else { // } else {
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost // // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max(); // uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0; // uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) { // for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) { // if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i; // min_cost_idx = i;
break; // break;
} // }
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + // uint64_t cost = compute_resources[i]->TaskAvgCost() *
transport_costs[i]; // compute_resources[i]->NumOfTaskToExec() +
if (min_cost > cost) { // transport_costs[i];
min_cost = cost; // if (min_cost > cost) {
min_cost_idx = i; // min_cost = cost;
} // min_cost_idx = i;
} // }
// }
// step 3: set path in task //
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); // // step 3: set path in task
task->path() = task_path; // Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
} // task->path() = task_path;
// }
} else if (task->job_.lock()->type() == JobType::BUILD) { //
// } else
if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config // step2: Read device id in config
// get build index gpu resource // get build index gpu resource
server::Config& config = server::Config::GetInstance(); server::Config& config = server::Config::GetInstance();
@ -201,12 +203,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
} }
if (resource->name() == task->path().Last()) { if (resource->name() == task->path().Last()) {
resource->WakeupLoader(); resource->WakeupExecutor();
} else { } else {
auto next_res_name = task->path().Next(); auto next_res_name = task->path().Next();
auto next_res = res_mgr.lock()->GetResource(next_res_name); auto next_res = res_mgr.lock()->GetResource(next_res_name);
event->task_table_item_->Move(); if (event->task_table_item_->Move()) {
next_res->task_table().Put(task); next_res->task_table().Put(task);
}
} }
} }

View File

@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/LargeSQ8HPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
bool
LargeSQ8HPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
if (search_job->nq() < 100) {
return false;
}
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
std::vector<int64_t> all_free_mem;
for (auto& gpu : gpus) {
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
all_free_mem.push_back(free_mem);
}
auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
auto best_index = std::distance(all_free_mem.begin(), max_e);
auto best_device_id = gpus[best_index];
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
if (not res_ptr) {
SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
// TODO: throw critical error and exit
return false;
}
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
return true;
}
} // namespace scheduler
} // namespace milvus

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Pass.h"
namespace milvus {
namespace scheduler {
class LargeSQ8HPass : public Pass {
public:
LargeSQ8HPass() = default;
public:
bool
Run(const TaskPtr& task) override;
};
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
} // namespace scheduler
} // namespace milvus

View File

@ -20,6 +20,7 @@
#include "scheduler/Utils.h" #include "scheduler/Utils.h"
#include <iostream> #include <iostream>
#include <limits>
#include <utility> #include <utility>
namespace milvus { namespace milvus {
@ -112,18 +113,18 @@ Resource::pick_task_load() {
TaskTableItemPtr TaskTableItemPtr
Resource::pick_task_execute() { Resource::pick_task_execute() {
// auto indexes = task_table_.PickToExecute(3);
auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max()); auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max());
for (auto index : indexes) { for (auto index : indexes) {
// try to set one task executing, then return // try to set one task executing, then return
// if (task_table_.Execute(index)) if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
// return task_table_.Get(index); if (task_table_[index]->task->path().Last() != name()) {
if (task_table_.Get(index)->task->path().Current() continue;
== task_table_.Get(index)->task->path().Last()) {
if (task_table_.Execute(index)) {
return task_table_.Get(index);
} }
} }
if (task_table_.Execute(index)) {
return task_table_.Get(index);
}
// else try next // else try next
} }
return nullptr; return nullptr;

View File

@ -22,6 +22,7 @@
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
#include <src/scheduler/SchedInst.h>
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include <thread> #include <thread>
@ -33,8 +34,6 @@ namespace scheduler {
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000; static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
std::mutex XSearchTask::merge_mutex_;
// TODO(wxyu): remove unused code // TODO(wxyu): remove unused code
// bool // bool
// NeedParallelReduce(uint64_t nq, uint64_t topk) { // NeedParallelReduce(uint64_t nq, uint64_t topk) {
@ -121,7 +120,11 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
stat = index_engine_->Load(); stat = index_engine_->Load();
type_str = "DISK2CPU"; type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) { } else if (type == LoadType::CPU2GPU) {
stat = index_engine_->CopyToGpu(device_id); bool hybrid = false;
if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) {
hybrid = true;
}
stat = index_engine_->CopyToGpu(device_id, hybrid);
type_str = "CPU2GPU"; type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) { } else if (type == LoadType::GPU2CPU) {
stat = index_engine_->CopyToCpu(); stat = index_engine_->CopyToCpu();
@ -204,14 +207,20 @@ XSearchTask::Execute() {
try { try {
// step 2: search // step 2: search
index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data()); bool hybrid = false;
if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H &&
ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) {
hybrid = true;
}
index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data(), hybrid);
double span = rc.RecordSection(hdr + ", do search"); double span = rc.RecordSection(hdr + ", do search");
// search_job->AccumSearchCost(span); // search_job->AccumSearchCost(span);
// step 3: pick up topk result // step 3: pick up topk result
auto spec_k = index_engine_->Count() < topk ? index_engine_->Count() : topk; auto spec_k = index_engine_->Count() < topk ? index_engine_->Count() : topk;
XSearchTask::TopkResult(output_ids, output_distance, spec_k, nq, topk, metric_l2, search_job->GetResult()); XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, metric_l2,
search_job->GetResult());
span = rc.RecordSection(hdr + ", reduce topk"); span = rc.RecordSection(hdr + ", reduce topk");
// search_job->AccumReduceCost(span); // search_job->AccumReduceCost(span);
@ -220,7 +229,7 @@ XSearchTask::Execute() {
// search_job->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed // search_job->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
} }
// step 5: notify to send result to client // step 4: notify to send result to client
search_job->SearchDone(index_id_); search_job->SearchDone(index_id_);
} }
@ -230,36 +239,37 @@ XSearchTask::Execute() {
index_engine_ = nullptr; index_engine_ = nullptr;
} }
Status void
XSearchTask::TopkResult(const std::vector<int64_t>& input_ids, const std::vector<float>& input_distance, XSearchTask::MergeTopkToResultSet(const std::vector<int64_t>& input_ids, const std::vector<float>& input_distance,
uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result) { uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending,
scheduler::ResultSet result_buf; scheduler::ResultSet& result) {
if (result.empty()) { if (result.empty()) {
result_buf.resize(nq, scheduler::Id2DistVec(input_k, scheduler::IdDistPair(-1, 0.0))); result.resize(nq);
for (auto i = 0; i < nq; ++i) { }
auto& result_buf_i = result_buf[i];
for (uint64_t i = 0; i < nq; i++) {
scheduler::Id2DistVec result_buf;
auto& result_i = result[i];
if (result[i].empty()) {
result_buf.resize(input_k, scheduler::IdDistPair(-1, 0.0));
uint64_t input_k_multi_i = input_k * i; uint64_t input_k_multi_i = input_k * i;
for (auto k = 0; k < input_k; ++k) { for (auto k = 0; k < input_k; ++k) {
uint64_t idx = input_k_multi_i + k; uint64_t idx = input_k_multi_i + k;
auto& result_buf_item = result_buf_i[k]; auto& result_buf_item = result_buf[k];
result_buf_item.first = input_ids[idx]; result_buf_item.first = input_ids[idx];
result_buf_item.second = input_distance[idx]; result_buf_item.second = input_distance[idx];
} }
} } else {
} else { size_t tar_size = result_i.size();
size_t tar_size = result[0].size(); uint64_t output_k = std::min(topk, input_k + tar_size);
uint64_t output_k = std::min(topk, input_k + tar_size); result_buf.resize(output_k, scheduler::IdDistPair(-1, 0.0));
result_buf.resize(nq, scheduler::Id2DistVec(output_k, scheduler::IdDistPair(-1, 0.0)));
for (auto i = 0; i < nq; ++i) {
size_t buf_k = 0, src_k = 0, tar_k = 0; size_t buf_k = 0, src_k = 0, tar_k = 0;
uint64_t src_idx; uint64_t src_idx;
auto& result_i = result[i];
auto& result_buf_i = result_buf[i];
uint64_t input_k_multi_i = input_k * i; uint64_t input_k_multi_i = input_k * i;
while (buf_k < output_k && src_k < input_k && tar_k < tar_size) { while (buf_k < output_k && src_k < input_k && tar_k < tar_size) {
src_idx = input_k_multi_i + src_k; src_idx = input_k_multi_i + src_k;
auto& result_buf_item = result_buf_i[buf_k]; auto& result_buf_item = result_buf[buf_k];
auto& result_item = result_i[tar_k]; auto& result_item = result_i[tar_k];
if ((ascending && input_distance[src_idx] < result_item.second) || if ((ascending && input_distance[src_idx] < result_item.second) ||
(!ascending && input_distance[src_idx] > result_item.second)) { (!ascending && input_distance[src_idx] > result_item.second)) {
@ -273,11 +283,11 @@ XSearchTask::TopkResult(const std::vector<int64_t>& input_ids, const std::vector
buf_k++; buf_k++;
} }
if (buf_k < topk) { if (buf_k < output_k) {
if (src_k < input_k) { if (src_k < input_k) {
while (buf_k < output_k && src_k < input_k) { while (buf_k < output_k && src_k < input_k) {
src_idx = input_k_multi_i + src_k; src_idx = input_k_multi_i + src_k;
auto& result_buf_item = result_buf_i[buf_k]; auto& result_buf_item = result_buf[buf_k];
result_buf_item.first = input_ids[src_idx]; result_buf_item.first = input_ids[src_idx];
result_buf_item.second = input_distance[src_idx]; result_buf_item.second = input_distance[src_idx];
src_k++; src_k++;
@ -285,18 +295,79 @@ XSearchTask::TopkResult(const std::vector<int64_t>& input_ids, const std::vector
} }
} else { } else {
while (buf_k < output_k && tar_k < tar_size) { while (buf_k < output_k && tar_k < tar_size) {
result_buf_i[buf_k] = result_i[tar_k]; result_buf[buf_k] = result_i[tar_k];
tar_k++; tar_k++;
buf_k++; buf_k++;
} }
} }
} }
} }
result_i.swap(result_buf);
}
}
void
XSearchTask::MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& tar_distance, uint64_t& tar_input_k,
const std::vector<int64_t>& src_ids, const std::vector<float>& src_distance,
uint64_t src_input_k, uint64_t nq, uint64_t topk, bool ascending) {
if (src_ids.empty() || src_distance.empty()) {
return;
} }
result.swap(result_buf); std::vector<int64_t> id_buf(nq * topk, -1);
std::vector<float> dist_buf(nq * topk, 0.0);
return Status::OK(); uint64_t output_k = std::min(topk, tar_input_k + src_input_k);
uint64_t buf_k, src_k, tar_k;
uint64_t src_idx, tar_idx, buf_idx;
uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i;
for (uint64_t i = 0; i < nq; i++) {
src_input_k_multi_i = src_input_k * i;
tar_input_k_multi_i = tar_input_k * i;
buf_k_multi_i = output_k * i;
buf_k = src_k = tar_k = 0;
while (buf_k < output_k && src_k < src_input_k && tar_k < tar_input_k) {
src_idx = src_input_k_multi_i + src_k;
tar_idx = tar_input_k_multi_i + tar_k;
buf_idx = buf_k_multi_i + buf_k;
if ((ascending && src_distance[src_idx] < tar_distance[tar_idx]) ||
(!ascending && src_distance[src_idx] > tar_distance[tar_idx])) {
id_buf[buf_idx] = src_ids[src_idx];
dist_buf[buf_idx] = src_distance[src_idx];
src_k++;
} else {
id_buf[buf_idx] = tar_ids[tar_idx];
dist_buf[buf_idx] = tar_distance[tar_idx];
tar_k++;
}
buf_k++;
}
if (buf_k < output_k) {
if (src_k < src_input_k) {
while (buf_k < output_k && src_k < src_input_k) {
src_idx = src_input_k_multi_i + src_k;
id_buf[buf_idx] = src_ids[src_idx];
dist_buf[buf_idx] = src_distance[src_idx];
src_k++;
buf_k++;
}
} else {
while (buf_k < output_k && tar_k < tar_input_k) {
id_buf[buf_idx] = tar_ids[tar_idx];
dist_buf[buf_idx] = tar_distance[tar_idx];
tar_k++;
buf_k++;
}
}
}
}
tar_ids.swap(id_buf);
tar_distance.swap(dist_buf);
tar_input_k = output_k;
} }
} // namespace scheduler } // namespace scheduler

View File

@ -38,9 +38,14 @@ class XSearchTask : public Task {
Execute() override; Execute() override;
public: public:
static Status static void
TopkResult(const std::vector<int64_t>& input_ids, const std::vector<float>& input_distance, uint64_t input_k, MergeTopkToResultSet(const std::vector<int64_t>& input_ids, const std::vector<float>& input_distance,
uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result); uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result);
static void
MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& tar_distance, uint64_t& tar_input_k,
const std::vector<int64_t>& src_ids, const std::vector<float>& src_distance, uint64_t src_input_k,
uint64_t nq, uint64_t topk, bool ascending);
public: public:
TableFileSchemaPtr file_; TableFileSchemaPtr file_;
@ -49,8 +54,6 @@ class XSearchTask : public Task {
int index_type_ = 0; int index_type_ = 0;
ExecutionEnginePtr index_engine_ = nullptr; ExecutionEnginePtr index_engine_ = nullptr;
bool metric_l2 = true; bool metric_l2 = true;
static std::mutex merge_mutex_;
}; };
} // namespace scheduler } // namespace scheduler

View File

@ -315,24 +315,40 @@ IVFHybridIndex::UnsetQuantizer() {
return Status::OK(); return Status::OK();
} }
Status VecIndexPtr
IVFHybridIndex::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { IVFHybridIndex::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
try { try {
// TODO(linxj): Hardcode here // TODO(linxj): Hardcode here
if (auto new_idx = std::dynamic_pointer_cast<knowhere::IVFSQHybrid>(index_)) { if (auto new_idx = std::dynamic_pointer_cast<knowhere::IVFSQHybrid>(index_)) {
new_idx->LoadData(q, conf); return std::make_shared<IVFHybridIndex>(new_idx->LoadData(q, conf), type);
} else { } else {
WRAPPER_LOG_ERROR << "Hybrid mode not support for index type: " << int(type); WRAPPER_LOG_ERROR << "Hybrid mode not support for index type: " << int(type);
return Status(KNOWHERE_ERROR, "not support");
} }
} catch (knowhere::KnowhereException& e) { } catch (knowhere::KnowhereException& e) {
WRAPPER_LOG_ERROR << e.what(); WRAPPER_LOG_ERROR << e.what();
return Status(KNOWHERE_UNEXPECTED_ERROR, e.what());
} catch (std::exception& e) { } catch (std::exception& e) {
WRAPPER_LOG_ERROR << e.what(); WRAPPER_LOG_ERROR << e.what();
return Status(KNOWHERE_ERROR, e.what());
} }
return Status::OK(); return nullptr;
}
std::pair<VecIndexPtr, knowhere::QuantizerPtr>
IVFHybridIndex::CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg) {
try {
// TODO(linxj): Hardcode here
if (auto hybrid_idx = std::dynamic_pointer_cast<knowhere::IVFSQHybrid>(index_)) {
auto pair = hybrid_idx->CopyCpuToGpuWithQuantizer(device_id, cfg);
auto new_idx = std::make_shared<IVFHybridIndex>(pair.first, type);
return std::make_pair(new_idx, pair.second);
} else {
WRAPPER_LOG_ERROR << "Hybrid mode not support for index type: " << int(type);
}
} catch (knowhere::KnowhereException& e) {
WRAPPER_LOG_ERROR << e.what();
} catch (std::exception& e) {
WRAPPER_LOG_ERROR << e.what();
}
return std::make_pair(nullptr, nullptr);
} }
} // namespace engine } // namespace engine

View File

@ -105,8 +105,10 @@ class IVFHybridIndex : public IVFMixIndex {
Status Status
UnsetQuantizer() override; UnsetQuantizer() override;
std::pair<VecIndexPtr, knowhere::QuantizerPtr>
CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg) override;
Status VecIndexPtr
LoadData(const knowhere::QuantizerPtr& q, const Config& conf) override; LoadData(const knowhere::QuantizerPtr& q, const Config& conf) override;
}; };

View File

@ -19,6 +19,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility>
#include "cache/DataObj.h" #include "cache/DataObj.h"
#include "knowhere/common/BinarySet.h" #include "knowhere/common/BinarySet.h"
@ -103,9 +104,9 @@ class VecIndex : public cache::DataObj {
return nullptr; return nullptr;
} }
virtual Status virtual VecIndexPtr
LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
return Status::OK(); return nullptr;
} }
virtual Status virtual Status
@ -117,6 +118,11 @@ class VecIndex : public cache::DataObj {
UnsetQuantizer() { UnsetQuantizer() {
return Status::OK(); return Status::OK();
} }
virtual std::pair<VecIndexPtr, knowhere::QuantizerPtr>
CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg = Config()) {
return std::make_pair(nullptr, nullptr);
}
//////////////// ////////////////
private: private:
int64_t size_ = 0; int64_t size_ = 0;

View File

@ -297,6 +297,7 @@ TEST_F(DBTest, SEARCH_TEST) {
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
} }
#ifdef CUSTOMIZATION
//test FAISS_IVFSQ8H optimizer //test FAISS_IVFSQ8H optimizer
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H;
db_->CreateIndex(TABLE_NAME, index); // wait until build index finish db_->CreateIndex(TABLE_NAME, index); // wait until build index finish
@ -314,9 +315,7 @@ TEST_F(DBTest, SEARCH_TEST) {
stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results); stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results);
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
} }
#endif
// TODO(lxj): add groundTruth assert
} }
TEST_F(DBTest, PRELOADTABLE_TEST) { TEST_F(DBTest, PRELOADTABLE_TEST) {

View File

@ -21,26 +21,51 @@
#include "scheduler/task/SearchTask.h" #include "scheduler/task/SearchTask.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
#include "utils/ThreadPool.h"
namespace { namespace {
namespace ms = milvus::scheduler; namespace ms = milvus::scheduler;
void void
BuildResult(uint64_t nq, BuildResult(std::vector<int64_t>& output_ids,
std::vector<float>& output_distance,
uint64_t topk, uint64_t topk,
bool ascending, uint64_t nq,
std::vector<int64_t>& output_ids, bool ascending) {
std::vector<float>& output_distence) {
output_ids.clear(); output_ids.clear();
output_ids.resize(nq * topk); output_ids.resize(nq * topk);
output_distence.clear(); output_distance.clear();
output_distence.resize(nq * topk); output_distance.resize(nq * topk);
for (uint64_t i = 0; i < nq; i++) { for (uint64_t i = 0; i < nq; i++) {
for (uint64_t j = 0; j < topk; j++) { for (uint64_t j = 0; j < topk; j++) {
output_ids[i * topk + j] = (int64_t)(drand48() * 100000); output_ids[i * topk + j] = (int64_t)(drand48() * 100000);
output_distence[i * topk + j] = ascending ? (j + drand48()) : ((topk - j) + drand48()); output_distance[i * topk + j] = ascending ? (j + drand48()) : ((topk - j) + drand48());
}
}
}
void
CopyResult(std::vector<int64_t>& output_ids,
std::vector<float>& output_distance,
uint64_t output_topk,
std::vector<int64_t>& input_ids,
std::vector<float>& input_distance,
uint64_t input_topk,
uint64_t nq) {
ASSERT_TRUE(input_ids.size() >= nq * input_topk);
ASSERT_TRUE(input_distance.size() >= nq * input_topk);
ASSERT_TRUE(output_topk <= input_topk);
output_ids.clear();
output_ids.resize(nq * output_topk);
output_distance.clear();
output_distance.resize(nq * output_topk);
for (uint64_t i = 0; i < nq; i++) {
for (uint64_t j = 0; j < output_topk; j++) {
output_ids[i * output_topk + j] = input_ids[i * input_topk + j];
output_distance[i * output_topk + j] = input_distance[i * input_topk + j];
} }
} }
} }
@ -50,8 +75,8 @@ CheckTopkResult(const std::vector<int64_t>& input_ids_1,
const std::vector<float>& input_distance_1, const std::vector<float>& input_distance_1,
const std::vector<int64_t>& input_ids_2, const std::vector<int64_t>& input_ids_2,
const std::vector<float>& input_distance_2, const std::vector<float>& input_distance_2,
uint64_t nq,
uint64_t topk, uint64_t topk,
uint64_t nq,
bool ascending, bool ascending,
const milvus::scheduler::ResultSet& result) { const milvus::scheduler::ResultSet& result) {
ASSERT_EQ(result.size(), nq); ASSERT_EQ(result.size(), nq);
@ -91,43 +116,36 @@ TEST(DBSearchTest, TOPK_TEST) {
bool ascending; bool ascending;
std::vector<int64_t> ids1, ids2; std::vector<int64_t> ids1, ids2;
std::vector<float> dist1, dist2; std::vector<float> dist1, dist2;
milvus::scheduler::ResultSet result; ms::ResultSet result;
milvus::Status status;
/* test1, id1/dist1 valid, id2/dist2 empty */ /* test1, id1/dist1 valid, id2/dist2 empty */
ascending = true; ascending = true;
BuildResult(NQ, TOP_K, ascending, ids1, dist1); BuildResult(ids1, dist1, TOP_K, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
/* test2, id1/dist1 valid, id2/dist2 valid */ /* test2, id1/dist1 valid, id2/dist2 valid */
BuildResult(NQ, TOP_K, ascending, ids2, dist2); BuildResult(ids2, dist2, TOP_K, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
/* test3, id1/dist1 small topk */ /* test3, id1/dist1 small topk */
ids1.clear(); ids1.clear();
dist1.clear(); dist1.clear();
result.clear(); result.clear();
BuildResult(NQ, TOP_K / 2, ascending, ids1, dist1); BuildResult(ids1, dist1, TOP_K/2, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
ASSERT_TRUE(status.ok());
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
/* test4, id1/dist1 small topk, id2/dist2 small topk */ /* test4, id1/dist1 small topk, id2/dist2 small topk */
ids2.clear(); ids2.clear();
dist2.clear(); dist2.clear();
result.clear(); result.clear();
BuildResult(NQ, TOP_K / 3, ascending, ids2, dist2); BuildResult(ids2, dist2, TOP_K/3, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result);
status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K / 3, NQ, TOP_K, ascending, result); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
ASSERT_TRUE(status.ok());
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
ascending = false; ascending = false;
@ -138,71 +156,199 @@ TEST(DBSearchTest, TOPK_TEST) {
result.clear(); result.clear();
/* test1, id1/dist1 valid, id2/dist2 empty */ /* test1, id1/dist1 valid, id2/dist2 empty */
BuildResult(NQ, TOP_K, ascending, ids1, dist1); BuildResult(ids1, dist1, TOP_K, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
/* test2, id1/dist1 valid, id2/dist2 valid */ /* test2, id1/dist1 valid, id2/dist2 valid */
BuildResult(NQ, TOP_K, ascending, ids2, dist2); BuildResult(ids2, dist2, TOP_K, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
/* test3, id1/dist1 small topk */ /* test3, id1/dist1 small topk */
ids1.clear(); ids1.clear();
dist1.clear(); dist1.clear();
result.clear(); result.clear();
BuildResult(NQ, TOP_K / 2, ascending, ids1, dist1); BuildResult(ids1, dist1, TOP_K/2, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
ASSERT_TRUE(status.ok());
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
/* test4, id1/dist1 small topk, id2/dist2 small topk */ /* test4, id1/dist1 small topk, id2/dist2 small topk */
ids2.clear(); ids2.clear();
dist2.clear(); dist2.clear();
result.clear(); result.clear();
BuildResult(NQ, TOP_K / 3, ascending, ids2, dist2); BuildResult(ids2, dist2, TOP_K/3, NQ, ascending);
status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ASSERT_TRUE(status.ok()); ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result);
status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K / 3, NQ, TOP_K, ascending, result); CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
ASSERT_TRUE(status.ok());
CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result);
} }
TEST(DBSearchTest, REDUCE_PERF_TEST) { TEST(DBSearchTest, REDUCE_PERF_TEST) {
int32_t nq = 100;
int32_t top_k = 1000;
int32_t index_file_num = 478; /* sift1B dataset, index files num */ int32_t index_file_num = 478; /* sift1B dataset, index files num */
bool ascending = true; bool ascending = true;
std::vector<int32_t> thread_vec = {4, 8, 11};
std::vector<int32_t> nq_vec = {1, 10, 100, 1000};
std::vector<int32_t> topk_vec = {1, 4, 16, 64, 256, 1024};
int32_t NQ = nq_vec[nq_vec.size()-1];
int32_t TOPK = topk_vec[topk_vec.size()-1];
std::vector<std::vector<int64_t>> id_vec;
std::vector<std::vector<float>> dist_vec;
std::vector<int64_t> input_ids; std::vector<int64_t> input_ids;
std::vector<float> input_distance; std::vector<float> input_distance;
milvus::scheduler::ResultSet final_result; int32_t i, k, step;
milvus::Status status;
double span, reduce_cost = 0.0; /* generate testing data */
milvus::TimeRecorder rc(""); for (i = 0; i < index_file_num; i++) {
BuildResult(input_ids, input_distance, TOPK, NQ, ascending);
for (int32_t i = 0; i < index_file_num; i++) { id_vec.push_back(input_ids);
BuildResult(nq, top_k, ascending, input_ids, input_distance); dist_vec.push_back(input_distance);
}
rc.RecordSection("do search for context: " + std::to_string(i));
for (int32_t max_thread_num : thread_vec) {
// pick up topk result milvus::ThreadPool threadPool(max_thread_num);
status = milvus::scheduler::XSearchTask::TopkResult(input_ids, std::list<std::future<void>> threads_list;
input_distance,
top_k, for (int32_t nq : nq_vec) {
nq, for (int32_t top_k : topk_vec) {
top_k, ms::ResultSet final_result, final_result_2, final_result_3;
ascending,
final_result); std::vector<std::vector<int64_t>> id_vec_1(index_file_num);
ASSERT_TRUE(status.ok()); std::vector<std::vector<float>> dist_vec_1(index_file_num);
ASSERT_EQ(final_result.size(), nq); for (i = 0; i < index_file_num; i++) {
CopyResult(id_vec_1[i], dist_vec_1[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
span = rc.RecordSection("reduce topk for context: " + std::to_string(i)); }
reduce_cost += span;
std::string str1 = "Method-1 " + std::to_string(max_thread_num) + " " +
std::to_string(nq) + " " + std::to_string(top_k);
milvus::TimeRecorder rc1(str1);
///////////////////////////////////////////////////////////////////////////////////////
/* method-1 */
for (i = 0; i < index_file_num; i++) {
ms::XSearchTask::MergeTopkToResultSet(id_vec_1[i],
dist_vec_1[i],
top_k,
nq,
top_k,
ascending,
final_result);
ASSERT_EQ(final_result.size(), nq);
}
rc1.RecordSection("reduce done");
///////////////////////////////////////////////////////////////////////////////////////
/* method-2 */
std::vector<std::vector<int64_t>> id_vec_2(index_file_num);
std::vector<std::vector<float>> dist_vec_2(index_file_num);
std::vector<uint64_t> k_vec_2(index_file_num);
for (i = 0; i < index_file_num; i++) {
CopyResult(id_vec_2[i], dist_vec_2[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
k_vec_2[i] = top_k;
}
std::string str2 = "Method-2 " + std::to_string(max_thread_num) + " " +
std::to_string(nq) + " " + std::to_string(top_k);
milvus::TimeRecorder rc2(str2);
for (step = 1; step < index_file_num; step *= 2) {
for (i = 0; i + step < index_file_num; i += step * 2) {
ms::XSearchTask::MergeTopkArray(id_vec_2[i], dist_vec_2[i], k_vec_2[i],
id_vec_2[i + step], dist_vec_2[i + step], k_vec_2[i + step],
nq, top_k, ascending);
}
}
ms::XSearchTask::MergeTopkToResultSet(id_vec_2[0],
dist_vec_2[0],
k_vec_2[0],
nq,
top_k,
ascending,
final_result_2);
ASSERT_EQ(final_result_2.size(), nq);
rc2.RecordSection("reduce done");
for (i = 0; i < nq; i++) {
ASSERT_EQ(final_result[i].size(), final_result_2[i].size());
for (k = 0; k < final_result[i].size(); k++) {
if (final_result[i][k].first != final_result_2[i][k].first) {
std::cout << i << " " << k << std::endl;
}
ASSERT_EQ(final_result[i][k].first, final_result_2[i][k].first);
ASSERT_EQ(final_result[i][k].second, final_result_2[i][k].second);
}
}
///////////////////////////////////////////////////////////////////////////////////////
/* method-3 parallel */
std::vector<std::vector<int64_t>> id_vec_3(index_file_num);
std::vector<std::vector<float>> dist_vec_3(index_file_num);
std::vector<uint64_t> k_vec_3(index_file_num);
for (i = 0; i < index_file_num; i++) {
CopyResult(id_vec_3[i], dist_vec_3[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
k_vec_3[i] = top_k;
}
std::string str3 = "Method-3 " + std::to_string(max_thread_num) + " " +
std::to_string(nq) + " " + std::to_string(top_k);
milvus::TimeRecorder rc3(str3);
for (step = 1; step < index_file_num; step *= 2) {
for (i = 0; i + step < index_file_num; i += step * 2) {
threads_list.push_back(
threadPool.enqueue(ms::XSearchTask::MergeTopkArray,
std::ref(id_vec_3[i]),
std::ref(dist_vec_3[i]),
std::ref(k_vec_3[i]),
std::ref(id_vec_3[i + step]),
std::ref(dist_vec_3[i + step]),
std::ref(k_vec_3[i + step]),
nq,
top_k,
ascending));
}
while (threads_list.size() > 0) {
int nready = 0;
for (auto it = threads_list.begin(); it != threads_list.end(); it = it) {
auto &p = *it;
std::chrono::milliseconds span(0);
if (p.wait_for(span) == std::future_status::ready) {
threads_list.erase(it++);
++nready;
} else {
++it;
}
}
if (nready == 0) {
std::this_thread::yield();
}
}
}
ms::XSearchTask::MergeTopkToResultSet(id_vec_3[0],
dist_vec_3[0],
k_vec_3[0],
nq,
top_k,
ascending,
final_result_3);
ASSERT_EQ(final_result_3.size(), nq);
rc3.RecordSection("reduce done");
for (i = 0; i < nq; i++) {
ASSERT_EQ(final_result[i].size(), final_result_3[i].size());
for (k = 0; k < final_result[i].size(); k++) {
ASSERT_EQ(final_result[i][k].first, final_result_3[i][k].first);
ASSERT_EQ(final_result[i][k].second, final_result_3[i][k].second);
}
}
}
}
} }
std::cout << "total reduce time: " << reduce_cost / 1000 << " ms" << std::endl;
} }

View File

@ -165,7 +165,9 @@ class ResourceMgrAdvanceTest : public testing::Test {
SetUp() override { SetUp() override {
mgr1_ = std::make_shared<ResourceMgr>(); mgr1_ = std::make_shared<ResourceMgr>();
disk_res = std::make_shared<DiskResource>("disk", 0, true, false); disk_res = std::make_shared<DiskResource>("disk", 0, true, false);
cpu_res = std::make_shared<CpuResource>("cpu", 0, true, true);
mgr1_->Add(ResourcePtr(disk_res)); mgr1_->Add(ResourcePtr(disk_res));
mgr1_->Add(ResourcePtr(cpu_res));
mgr1_->Start(); mgr1_->Start();
} }
@ -176,6 +178,7 @@ class ResourceMgrAdvanceTest : public testing::Test {
ResourceMgrPtr mgr1_; ResourceMgrPtr mgr1_;
ResourcePtr disk_res; ResourcePtr disk_res;
ResourcePtr cpu_res;
}; };
TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) { TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {

View File

@ -28,18 +28,17 @@
#include "utils/Error.h" #include "utils/Error.h"
#include "wrapper/VecIndex.h" #include "wrapper/VecIndex.h"
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
class MockVecIndex : public engine::VecIndex { class MockVecIndex : public engine::VecIndex {
public: public:
virtual Status BuildAll(const int64_t &nb, virtual Status BuildAll(const int64_t& nb,
const float *xb, const float* xb,
const int64_t *ids, const int64_t* ids,
const engine::Config &cfg, const engine::Config& cfg,
const int64_t &nt = 0, const int64_t& nt = 0,
const float *xt = nullptr) { const float* xt = nullptr) {
} }
engine::VecIndexPtr Clone() override { engine::VecIndexPtr Clone() override {
@ -54,23 +53,23 @@ class MockVecIndex : public engine::VecIndex {
return engine::IndexType::INVALID; return engine::IndexType::INVALID;
} }
virtual Status Add(const int64_t &nb, virtual Status Add(const int64_t& nb,
const float *xb, const float* xb,
const int64_t *ids, const int64_t* ids,
const engine::Config &cfg = engine::Config()) { const engine::Config& cfg = engine::Config()) {
} }
virtual Status Search(const int64_t &nq, virtual Status Search(const int64_t& nq,
const float *xq, const float* xq,
float *dist, float* dist,
int64_t *ids, int64_t* ids,
const engine::Config &cfg = engine::Config()) { const engine::Config& cfg = engine::Config()) {
} }
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { engine::VecIndexPtr CopyToGpu(const int64_t& device_id, const engine::Config& cfg) override {
} }
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { engine::VecIndexPtr CopyToCpu(const engine::Config& cfg) override {
} }
virtual int64_t Dimension() { virtual int64_t Dimension() {
@ -86,7 +85,7 @@ class MockVecIndex : public engine::VecIndex {
return binset; return binset;
} }
virtual Status Load(const knowhere::BinarySet &index_binary) { virtual Status Load(const knowhere::BinarySet& index_binary) {
} }
public: public:
@ -102,11 +101,13 @@ class SchedulerTest : public testing::Test {
cache::GpuCacheMgr::GetInstance(0)->SetCapacity(cache_cap); cache::GpuCacheMgr::GetInstance(0)->SetCapacity(cache_cap);
cache::GpuCacheMgr::GetInstance(1)->SetCapacity(cache_cap); cache::GpuCacheMgr::GetInstance(1)->SetCapacity(cache_cap);
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
res_mgr_ = std::make_shared<ResourceMgr>(); res_mgr_ = std::make_shared<ResourceMgr>();
disk_resource_ = res_mgr_->Add(std::move(disk));
cpu_resource_ = res_mgr_->Add(std::move(cpu)); cpu_resource_ = res_mgr_->Add(std::move(cpu));
gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
@ -127,6 +128,7 @@ class SchedulerTest : public testing::Test {
res_mgr_->Stop(); res_mgr_->Stop();
} }
ResourceWPtr disk_resource_;
ResourceWPtr cpu_resource_; ResourceWPtr cpu_resource_;
ResourceWPtr gpu_resource_0_; ResourceWPtr gpu_resource_0_;
ResourceWPtr gpu_resource_1_; ResourceWPtr gpu_resource_1_;
@ -137,7 +139,7 @@ class SchedulerTest : public testing::Test {
void void
insert_dummy_index_into_gpu_cache(uint64_t device_id) { insert_dummy_index_into_gpu_cache(uint64_t device_id) {
MockVecIndex *mock_index = new MockVecIndex(); MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000; mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index); engine::VecIndexPtr index(mock_index);
@ -224,6 +226,7 @@ class SchedulerTest2 : public testing::Test {
TearDown() override { TearDown() override {
scheduler_->Stop(); scheduler_->Stop();
res_mgr_->Stop(); res_mgr_->Stop();
res_mgr_->Clear();
} }
ResourceWPtr disk_; ResourceWPtr disk_;
@ -237,22 +240,22 @@ class SchedulerTest2 : public testing::Test {
std::shared_ptr<Scheduler> scheduler_; std::shared_ptr<Scheduler> scheduler_;
}; };
TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) { //TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) {
const uint64_t NUM = 10; // const uint64_t NUM = 2;
std::vector<std::shared_ptr<TestTask>> tasks; // std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>(); // TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>();
dummy->location_ = "location"; // dummy->location_ = "location";
//
for (uint64_t i = 0; i < NUM; ++i) { // for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>(); // auto label = std::make_shared<DefaultLabel>();
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy, label); // std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy, label);
task->label() = std::make_shared<SpecResLabel>(disk_); // task->label() = std::make_shared<SpecResLabel>(disk_);
tasks.push_back(task); // tasks.push_back(task);
disk_.lock()->task_table().Put(task); // disk_.lock()->task_table().Put(task);
} // }
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); // ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
} //}
} // namespace scheduler } // namespace scheduler
} // namespace milvus } // namespace milvus

View File

@ -188,7 +188,7 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest,
10, 10,
10), 10),
std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_CPU, "Default", DIM, NB, 10, 10), std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_CPU, "Default", DIM, NB, 10, 10),
std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_GPU, "Default", DIM, NB, 10, 10), // std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_GPU, "Default", DIM, NB, 10, 10),
std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_MIX, "Default", DIM, NB, 10, 10), std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_MIX, "Default", DIM, NB, 10, 10),
// std::make_tuple(IndexType::NSG_MIX, "Default", 128, 250000, 10, 10), // std::make_tuple(IndexType::NSG_MIX, "Default", 128, 250000, 10, 10),
// std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default", 128, 250000, 10, 10), // std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default", 128, 250000, 10, 10),