diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..297cf0e592 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "cpp/thirdparty/knowhere"] + path = cpp/thirdparty/knowhere + url = git@192.168.1.105:xiaojun.lin/knowhere.git + branch = develop diff --git a/cpp/.gitignore b/cpp/.gitignore index 03149cde32..6c2602d341 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -7,4 +7,4 @@ lcov_out/ base.info output.info output_new.info -server.info \ No newline at end of file +server.info diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 642921e1fc..5aca601465 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -21,6 +21,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-235 - Some test cases random fail - MS-236 - Add MySQLMetaImpl::HasNonIndexFiles - MS-257 - Update bzip2 download url +- MS-288 - Update compile scripts ## Improvement - MS-156 - Add unittest for merge result functions @@ -37,11 +38,14 @@ Please mark all change in change log and use the ticket from JIRA. - MS-260 - Refine log - MS-249 - Check machine hardware during initialize - MS-261 - Update faiss version to 1.5.3 and add BUILD_FAISS_WITH_MKL as an option +- MS-266 - Improve topk reduce time by using multi-threads +- MS-275 - Avoid sqlite logic error excetion - MS-278 - add IndexStatsHelper ## New Feature - MS-180 - Add new mem manager - MS-195 - Add nlist and use_blas_threshold conf +- MS-137 - Integrate knowhere ## Task diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 39b7558fda..528168d23a 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -109,6 +109,14 @@ include(ThirdPartyPackages) include_directories(${MILVUS_SOURCE_DIR}) link_directories(${MILVUS_BINARY_DIR}) +if (NOT DEFINED KNOWHERE_BUILD_DIR) + message(FATAL_ERROR "You must set environment variable KNOWHERE_BUILD_DIR") +endif() +message(STATUS "Build with ${KNOWHERE_BUILD_DIR}") +include_directories(${KNOWHERE_BUILD_DIR}/include) +include_directories(${KNOWHERE_BUILD_DIR}/include/SPTAG/AnnService) +link_directories(${KNOWHERE_BUILD_DIR}/lib) + ## Following should be check set(MILVUS_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include) diff --git a/cpp/README.md b/cpp/README.md index b46515a1cf..50db004a9d 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -21,6 +21,9 @@ cmake_build/src/milvus_server is the server cmake_build/src/libmilvus_engine.a is the static library + git submodule init + git submodule update + cd [sourcecode path]/cpp ./build.sh -t Debug ./build.sh -t Release @@ -53,10 +56,10 @@ If you encounter the following error when building: ### Launch server Set config in cpp/conf/server_config.yaml -Add milvus/bin/lib to LD_LIBRARY_PATH +Add milvus/lib to LD_LIBRARY_PATH ``` -export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/path/to/milvus/bin/lib +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/path/to/milvus/lib ``` Then launch server with config: diff --git a/cpp/build.sh b/cpp/build.sh index 359f91358a..38be7913df 100755 --- a/cpp/build.sh +++ b/cpp/build.sh @@ -75,6 +75,12 @@ if [[ ! -d cmake_build ]]; then MAKE_CLEAN="ON" fi +# Build Knowhere +KNOWHERE_BUILD_DIR="`pwd`/thirdparty/knowhere_build" +pushd `pwd`/thirdparty/knowhere +./build.sh -t Release -p ${KNOWHERE_BUILD_DIR} +popd + cd cmake_build CUDA_COMPILER=/usr/local/cuda/bin/nvcc @@ -90,6 +96,7 @@ if [[ ${MAKE_CLEAN} == "ON" ]]; then -DMILVUS_ENABLE_PROFILING=${PROFILING} \ -DBUILD_FAISS_WITH_MKL=${BUILD_FAISS_WITH_MKL} \ -DMILVUS_ENABLE_THRIFT=${MILVUS_ENABLE_THRIFT} \ + -DKNOWHERE_BUILD_DIR=${KNOWHERE_BUILD_DIR} \ $@ ../" echo ${CMAKE_CMD} diff --git a/cpp/cmake/DefineOptions.cmake b/cpp/cmake/DefineOptions.cmake index cc9792391d..af89dccb4b 100644 --- a/cpp/cmake/DefineOptions.cmake +++ b/cpp/cmake/DefineOptions.cmake @@ -66,20 +66,17 @@ define_option(MILVUS_WITH_BZ2 "Build with BZ2 compression" ON) define_option(MILVUS_WITH_EASYLOGGINGPP "Build with Easylogging++ library" ON) -define_option(MILVUS_WITH_FAISS "Build with FAISS library" ON) +define_option(MILVUS_WITH_FAISS "Build with FAISS library" OFF) -define_option(MILVUS_WITH_FAISS_GPU_VERSION "Build with FAISS GPU version" ON) +define_option(MILVUS_WITH_FAISS_GPU_VERSION "Build with FAISS GPU version" OFF) -#define_option_string(MILVUS_FAISS_GPU_ARCH "Specifying which GPU architectures to build against" -# "-gencode=arch=compute_35,code=compute_35 -gencode=arch=compute_52,code=compute_52 -gencode=arch=compute_60,code=compute_60 -gencode=arch=compute_61,code=compute_61") - -define_option(MILVUS_WITH_LAPACK "Build with LAPACK library" ON) +define_option(MILVUS_WITH_LAPACK "Build with LAPACK library" OFF) define_option(MILVUS_WITH_LZ4 "Build with lz4 compression" ON) define_option(MILVUS_WITH_JSONCONS "Build with JSONCONS" OFF) -define_option(MILVUS_WITH_OPENBLAS "Build with OpenBLAS library" ON) +define_option(MILVUS_WITH_OPENBLAS "Build with OpenBLAS library" OFF) define_option(MILVUS_WITH_PROMETHEUS "Build with PROMETHEUS library" ON) @@ -99,6 +96,8 @@ define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON) define_option(MILVUS_WITH_ZLIB "Build with zlib compression" ON) +define_option(MILVUS_WITH_KNOWHERE "Build with Knowhere" OFF) + if(CMAKE_VERSION VERSION_LESS 3.7) set(MILVUS_WITH_ZSTD_DEFAULT OFF) else() diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index e460f6eb5f..046273a0b8 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -22,6 +22,7 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES Easylogging++ FAISS GTest + Knowhere JSONCONS LAPACK Lz4 @@ -62,6 +63,8 @@ macro(build_dependency DEPENDENCY_NAME) build_gtest() elseif("${DEPENDENCY_NAME}" STREQUAL "LAPACK") build_lapack() + elseif("${DEPENDENCY_NAME}" STREQUAL "Knowhere") + build_knowhere() elseif("${DEPENDENCY_NAME}" STREQUAL "Lz4") build_lz4() elseif ("${DEPENDENCY_NAME}" STREQUAL "MySQLPP") @@ -234,6 +237,12 @@ else() set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz") endif() +if(DEFINED ENV{MILVUS_KNOWHERE_URL}) + set(KNOWHERE_SOURCE_URL "$ENV{MILVUS_KNOWHERE_URL}") +else() + set(KNOWHERE_SOURCE_URL "${CMAKE_SOURCE_DIR}/thirdparty/knowhere") +endif() + if (DEFINED ENV{MILVUS_GTEST_URL}) set(GTEST_SOURCE_URL "$ENV{MILVUS_GTEST_URL}") else () @@ -574,6 +583,55 @@ if(MILVUS_WITH_BZ2) include_directories(SYSTEM "${BZIP2_INCLUDE_DIR}") endif() +# ---------------------------------------------------------------------- +# Knowhere + +macro(build_knowhere) + message(STATUS "Building knowhere from source") + set(KNOWHERE_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/knowhere_ep-prefix/src/knowhere_ep") + set(KNOWHERE_INCLUDE_DIR "${KNOWHERE_PREFIX}/include") + set(KNOWHERE_STATIC_LIB + "${KNOWHERE_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}knowhere${CMAKE_STATIC_LIBRARY_SUFFIX}") + + set(KNOWHERE_CMAKE_ARGS + ${EP_COMMON_CMAKE_ARGS} + "-DCMAKE_INSTALL_PREFIX=${KNOWHERE_PREFIX}" + -DCMAKE_INSTALL_LIBDIR=lib + "-DCMAKE_CUDA_COMPILER=${CMAKE_CUDA_COMPILER}" + "-DCUDA_TOOLKIT_ROOT_DIR=${CUDA_TOOLKIT_ROOT_DIR}" + -DCMAKE_BUILD_TYPE=Release) + + externalproject_add(knowhere_ep + URL + ${KNOWHERE_SOURCE_URL} + ${EP_LOG_OPTIONS} + CMAKE_ARGS + ${KNOWHERE_CMAKE_ARGS} + BUILD_COMMAND + ${MAKE} + ${MAKE_BUILD_ARGS} + BUILD_BYPRODUCTS + ${KNOWHERE_STATIC_LIB}) + + file(MAKE_DIRECTORY "${KNOWHERE_INCLUDE_DIR}") + add_library(knowhere STATIC IMPORTED) + set_target_properties( + knowhere + PROPERTIES IMPORTED_LOCATION "${KNOWHERE_STATIC_LIB}" + INTERFACE_INCLUDE_DIRECTORIES "${KNOWHERE_INCLUDE_DIR}") + + add_dependencies(knowhere knowhere_ep) +endmacro() + +if(MILVUS_WITH_KNOWHERE) + resolve_dependency(Knowhere) + + get_target_property(KNOWHERE_INCLUDE_DIR knowhere INTERFACE_INCLUDE_DIRECTORIES) + link_directories(SYSTEM "${KNOWHERE_PREFIX}/lib") + include_directories(SYSTEM "${KNOWHERE_INCLUDE_DIR}") + include_directories(SYSTEM "${KNOWHERE_INCLUDE_DIR}/SPTAG/AnnService") +endif() + # ---------------------------------------------------------------------- # Easylogging++ diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index ff5cfc6cd5..63090c62c7 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -2,7 +2,7 @@ server_config: address: 0.0.0.0 port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534 gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1 - mode: single # milvus deployment type: single, cluster + mode: single # milvus deployment type: single, cluster, read_only db_config: db_path: @MILVUS_DB_PATH@ # milvus data storage path diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index f7f91e46d4..e7f24fe8f1 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -12,8 +12,9 @@ aux_source_directory(server/grpc_impl grpcserver_files) aux_source_directory(server/thrift_impl thriftserver_files) aux_source_directory(utils utils_files) aux_source_directory(db db_files) -aux_source_directory(wrapper wrapper_files) +#aux_source_directory(wrapper wrapper_files) aux_source_directory(metrics metrics_files) +aux_source_directory(wrapper/knowhere knowhere_files) aux_source_directory(db/scheduler scheduler_files) aux_source_directory(db/scheduler/context scheduler_context_files) @@ -55,6 +56,7 @@ set(engine_files ${db_scheduler_files} ${wrapper_files} ${metrics_files} + ${knowhere_files} ) set(get_sys_info_files @@ -72,6 +74,13 @@ include_directories(grpc/gen-status) include_directories(grpc/gen-milvus) set(third_party_libs + knowhere + SPTAGLibStatic + arrow + jemalloc_pic + faiss + openblas + lapack easyloggingpp sqlite thrift @@ -79,7 +88,6 @@ set(third_party_libs grpc++ grpcpp_channelz yaml-cpp - faiss prometheus-cpp-push prometheus-cpp-pull prometheus-cpp-core @@ -91,6 +99,8 @@ set(third_party_libs snappy zlib zstd + cudart + cublas mysqlpp ${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so cudart @@ -101,7 +111,6 @@ if (MEGASEARCH_WITH_ARROW STREQUAL "ON") endif() if(${BUILD_FAISS_WITH_MKL} STREQUAL "ON") set(third_party_libs ${third_party_libs} - ${MKL_LIBS} ${MKL_LIBS}) else() set(third_party_libs ${third_party_libs} @@ -121,8 +130,6 @@ if (GPU_VERSION STREQUAL "ON") pthread libgomp.a libgfortran.a - cudart - cublas ${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so ) else() @@ -182,6 +189,17 @@ set(server_libs metrics ) +set(knowhere_libs + knowhere + SPTAGLibStatic + arrow + jemalloc_pic + faiss + openblas + lapack + tbb + ) + if (MILVUS_ENABLE_THRIFT STREQUAL "ON") add_executable(milvus_thrift_server ${config_files} @@ -205,16 +223,17 @@ endif() if (ENABLE_LICENSE STREQUAL "ON") if(MILVUS_ENABLE_THRIFT STREQUAL "ON") - target_link_libraries(milvus_thrift_server ${server_libs} license_check ${third_party_libs}) + target_link_libraries(milvus_thrift_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) else() - target_link_libraries(milvus_grpc_server ${server_libs} license_check ${third_party_libs}) + target_link_libraries(milvus_grpc_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) endif() else () if(MILVUS_ENABLE_THRIFT STREQUAL "ON") - target_link_libraries(milvus_thrift_server ${server_libs} ${third_party_libs}) + target_link_libraries(milvus_thrift_server ${server_libs} ${knowhere_libs} ${third_party_libs}) else() - target_link_libraries(milvus_grpc_server ${server_libs} ${third_party_libs}) + target_link_libraries(milvus_grpc_server ${server_libs} ${knowhere_libs} ${third_party_libs}) endif() + endif() if (ENABLE_LICENSE STREQUAL "ON") @@ -235,6 +254,8 @@ else() endif() install(FILES + ${KNOWHERE_BUILD_DIR}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}tbb${CMAKE_SHARED_LIBRARY_SUFFIX} + ${KNOWHERE_BUILD_DIR}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}tbb${CMAKE_SHARED_LIBRARY_SUFFIX}.2 ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX} ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3 ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3.2.4 diff --git a/cpp/src/cache/DataObj.h b/cpp/src/cache/DataObj.h index 995711c6ab..341df34174 100644 --- a/cpp/src/cache/DataObj.h +++ b/cpp/src/cache/DataObj.h @@ -37,7 +37,7 @@ public: return size_; } - return index_->ntotal*(index_->dim*4); + return index_->Count() * index_->Dimension() * sizeof(float); } private: diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 62ae231d02..940cfb612f 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -198,18 +198,25 @@ Status DBImpl::Query(const std::string& table_id, const std::vector ids.push_back(std::stoul(id, &sz)); } - meta::TableFilesSchema files_array; - auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array); + meta::DatePartionedTableFilesSchema files_array; + auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array); if (!status.ok()) { return status; } - if(files_array.empty()) { + meta::TableFilesSchema file_id_array; + for (auto &day_files : files_array) { + for (auto &file : day_files.second) { + file_id_array.push_back(file); + } + } + + if(file_id_array.empty()) { return Status::Error("Invalid file id"); } cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query - status = QueryAsync(table_id, files_array, k, nq, vectors, dates, results); + status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results); cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query return status; } @@ -357,7 +364,7 @@ void DBImpl::StartCompactionTask() { Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) { - ENGINE_LOG_DEBUG << "Merge files for table" << table_id; + ENGINE_LOG_DEBUG << "Merge files for table " << table_id; meta::TableFileSchema table_file; table_file.table_id_ = table_id; @@ -424,13 +431,15 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { bool has_merge = false; for (auto& kv : raw_files) { auto files = kv.second; - if (files.size() <= options_.merge_trigger_number) { + if (files.size() < options_.merge_trigger_number) { + ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action"; continue; } has_merge = true; MergeFiles(table_id, kv.first, kv.second); if (shutting_down_.load(std::memory_order_acquire)){ + ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id; break; } } @@ -448,6 +457,11 @@ void DBImpl::BackgroundCompaction(std::set table_ids) { ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString(); continue;//let other table get chance to merge } + + if (shutting_down_.load(std::memory_order_acquire)){ + ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action"; + break; + } } meta_ptr_->Archive(); @@ -581,6 +595,11 @@ Status DBImpl::BuildIndexByTable(const std::string& table_id) { return status; } ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed"; + + if (shutting_down_.load(std::memory_order_acquire)){ + ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action for table " << table_id; + break; + } } return status; @@ -601,6 +620,7 @@ void DBImpl::BackgroundBuildIndex() { } if (shutting_down_.load(std::memory_order_acquire)){ + ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; break; } } diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 56f741c4dc..ba9846e98a 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -109,7 +109,7 @@ Status DBMetaImpl::Initialize() { auto ret = boost::filesystem::create_directory(options_.path); if (!ret) { ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path; - return Status::DBTransactionError("Failed to create db directory", options_.path); + return Status::InvalidDBPath("Failed to create db directory", options_.path); } } @@ -147,6 +147,9 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, } } + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + ConnectorPtr->update_all( set( c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE @@ -167,6 +170,9 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + if (table_schema.table_id_ == "") { NextTableId(table_schema.table_id_); } else { @@ -190,6 +196,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { auto id = ConnectorPtr->insert(table_schema); table_schema.id_ = id; } catch (...) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Add Table Error"); } @@ -206,6 +213,9 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + //soft delete table auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::files_cnt_, @@ -238,6 +248,9 @@ Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + //soft delete table files ConnectorPtr->update_all( set( @@ -383,6 +396,9 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) { file_schema.updated_time_ = file_schema.created_on_; file_schema.engine_type_ = table_schema.engine_type_; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto id = ConnectorPtr->insert(file_schema); file_schema.id_ = id; @@ -544,6 +560,79 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, return Status::OK(); } +Status DBMetaImpl::FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) { + files.clear(); + MetricCollector metric; + + try { + auto select_columns = columns(&TableFileSchema::id_, + &TableFileSchema::table_id_, + &TableFileSchema::file_id_, + &TableFileSchema::file_type_, + &TableFileSchema::size_, + &TableFileSchema::date_, + &TableFileSchema::engine_type_); + + auto match_tableid = c(&TableFileSchema::table_id_) == table_id; + auto is_raw = c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW; + auto is_toindex = c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX; + auto is_index = c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX; + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { return status; } + + decltype(ConnectorPtr->select(select_columns)) result; + if (partition.empty() && ids.empty()) { + auto filter = where(match_tableid and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + else if (partition.empty() && !ids.empty()) { + auto match_fileid = in(&TableFileSchema::id_, ids); + auto filter = where(match_tableid and match_fileid and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + else if (!partition.empty() && ids.empty()) { + auto match_date = in(&TableFileSchema::date_, partition); + auto filter = where(match_tableid and match_date and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + else if (!partition.empty() && !ids.empty()) { + auto match_fileid = in(&TableFileSchema::id_, ids); + auto match_date = in(&TableFileSchema::date_, partition); + auto filter = where(match_tableid and match_fileid and match_date and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + + TableFileSchema table_file; + for (auto &file : result) { + table_file.id_ = std::get<0>(file); + table_file.table_id_ = std::get<1>(file); + table_file.file_id_ = std::get<2>(file); + table_file.file_type_ = std::get<3>(file); + table_file.size_ = std::get<4>(file); + table_file.date_ = std::get<5>(file); + table_file.engine_type_ = std::get<6>(file); + table_file.dimension_ = table_schema.dimension_; + utils::GetTableFilePath(options_, table_file); + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + files[table_file.date_].push_back(table_file); + } + + } catch (std::exception &e) { + return HandleException("Encounter exception when iterate index files", e); + } + + return Status::OK(); +} + Status DBMetaImpl::FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) { files.clear(); @@ -649,6 +738,9 @@ Status DBMetaImpl::Archive() { long usecs = limit * D_SEC * US_PS; long now = utils::GetMicroSecTimeStamp(); try { + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + ConnectorPtr->update_all( set( c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE @@ -710,6 +802,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto commited = ConnectorPtr->transaction([&]() mutable { auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::size_), @@ -748,6 +843,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Update table file error"); } @@ -763,6 +859,9 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto tables = ConnectorPtr->select(columns(&TableSchema::state_), where(c(&TableSchema::table_id_) == file_schema.table_id_)); @@ -784,6 +883,11 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { try { + MetricCollector metric; + + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + ConnectorPtr->update_all( set( c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX @@ -803,6 +907,9 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + std::map has_tables; for (auto &file : files) { if(has_tables.find(file.table_id_) != has_tables.end()) { @@ -831,6 +938,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Update table files error"); } @@ -845,6 +953,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, @@ -873,6 +984,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Clean files error"); } @@ -883,6 +995,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_), where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE)); @@ -897,6 +1012,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Clean files error"); } @@ -909,6 +1025,11 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { Status DBMetaImpl::CleanUp() { try { + MetricCollector metric; + + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW)); @@ -921,6 +1042,7 @@ Status DBMetaImpl::CleanUp() { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Clean files error"); } diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 6187ad7eae..7f2a87fcb4 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -8,6 +8,7 @@ #include "Meta.h" #include "Options.h" +#include namespace zilliz { namespace milvus { @@ -62,6 +63,11 @@ class DBMetaImpl : public Meta { Status FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) override; + Status FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) override; + Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override; @@ -94,6 +100,8 @@ class DBMetaImpl : public Meta { Status Initialize(); const DBMetaOptions options_; + + std::mutex meta_mutex_; }; // DBMetaImpl } // namespace meta diff --git a/cpp/src/db/EngineFactory.cpp b/cpp/src/db/EngineFactory.cpp index 2ca4b24aa1..32e10beb8d 100644 --- a/cpp/src/db/EngineFactory.cpp +++ b/cpp/src/db/EngineFactory.cpp @@ -4,14 +4,15 @@ * Proprietary and confidential. ******************************************************************************/ #include "EngineFactory.h" -#include "FaissExecutionEngine.h" +//#include "FaissExecutionEngine.h" +#include "ExecutionEngineImpl.h" #include "Log.h" - namespace zilliz { namespace milvus { namespace engine { +#if 0 ExecutionEnginePtr EngineFactory::Build(uint16_t dimension, const std::string &location, @@ -26,7 +27,7 @@ EngineFactory::Build(uint16_t dimension, break; } - case EngineType::FAISS_IVFFLAT: { + case EngineType::FAISS_IVFFLAT_GPU: { execution_engine_ptr = ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, BUILD_INDEX_TYPE_IVF, "IDMap,Flat")); break; @@ -47,6 +48,25 @@ EngineFactory::Build(uint16_t dimension, execution_engine_ptr->Init(); return execution_engine_ptr; } +#else +ExecutionEnginePtr +EngineFactory::Build(uint16_t dimension, + const std::string &location, + EngineType type) { + + if(type == EngineType::INVALID) { + ENGINE_LOG_ERROR << "Unsupported engine type"; + return nullptr; + } + + ENGINE_LOG_DEBUG << "EngineFactory EngineTypee: " << int(type); + ExecutionEnginePtr execution_engine_ptr = + std::make_shared(dimension, location, type); + + execution_engine_ptr->Init(); + return execution_engine_ptr; +} +#endif } } diff --git a/cpp/src/db/ExecutionEngineImpl.cpp b/cpp/src/db/ExecutionEngineImpl.cpp new file mode 100644 index 0000000000..f878018dcd --- /dev/null +++ b/cpp/src/db/ExecutionEngineImpl.cpp @@ -0,0 +1,241 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include + +#include +#include +#include "Log.h" +#include "utils/CommonUtil.h" + +#include "src/cache/CpuCacheMgr.h" +#include "ExecutionEngineImpl.h" +#include "wrapper/knowhere/vec_index.h" +#include "wrapper/knowhere/vec_impl.h" +#include "knowhere/common/exception.h" +#include "Exception.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +namespace { +std::string GetMetricType() { + server::ServerConfig &config = server::ServerConfig::GetInstance(); + server::ConfigNode engine_config = config.GetConfig(server::CONFIG_ENGINE); + return engine_config.GetValue(server::CONFIG_METRICTYPE, "L2"); +} +} + +ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension, + const std::string &location, + EngineType type) + : location_(location), dim(dimension), build_type(type) { + current_type = EngineType::FAISS_IDMAP; + + index_ = CreatetVecIndex(EngineType::FAISS_IDMAP); + if (!index_) throw Exception("Create Empty VecIndex"); + + Config build_cfg; + build_cfg["dim"] = dimension; + build_cfg["metric_type"] = GetMetricType(); + AutoGenParams(index_->GetType(), 0, build_cfg); + auto ec = std::static_pointer_cast(index_)->Build(build_cfg); + if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); } +} + +ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index, + const std::string &location, + EngineType type) + : index_(std::move(index)), location_(location), build_type(type) { + current_type = type; +} + +VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) { + std::shared_ptr index; + switch (type) { + case EngineType::FAISS_IDMAP: { + index = GetVecIndexFactory(IndexType::FAISS_IDMAP); + break; + } + case EngineType::FAISS_IVFFLAT: { + index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX); + break; + } + case EngineType::FAISS_IVFSQ8: { + index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX); + break; + } + default: { + ENGINE_LOG_ERROR << "Invalid engine type"; + return nullptr; + } + } + return index; +} + +Status ExecutionEngineImpl::AddWithIds(long n, const float *xdata, const long *xids) { + auto ec = index_->Add(n, xdata, xids); + if (ec != server::KNOWHERE_SUCCESS) { + return Status::Error("Add error"); + } + return Status::OK(); +} + +size_t ExecutionEngineImpl::Count() const { + return index_->Count(); +} + +size_t ExecutionEngineImpl::Size() const { + return (size_t) (Count() * Dimension()) * sizeof(float); +} + +size_t ExecutionEngineImpl::Dimension() const { + return index_->Dimension(); +} + +size_t ExecutionEngineImpl::PhysicalSize() const { + return server::CommonUtil::GetFileSize(location_); +} + +Status ExecutionEngineImpl::Serialize() { + auto ec = write_index(index_, location_); + if (ec != server::KNOWHERE_SUCCESS) { + return Status::Error("Serialize: write to disk error"); + } + return Status::OK(); +} + +Status ExecutionEngineImpl::Load(bool to_cache) { + index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); + bool already_in_cache = (index_ != nullptr); + auto start_time = METRICS_NOW_TIME; + if (!index_) { + try { + index_ = read_index(location_); + ENGINE_LOG_DEBUG << "Disk io from: " << location_; + } catch (knowhere::KnowhereException &e) { + ENGINE_LOG_ERROR << e.what(); + return Status::Error(e.what()); + } catch (std::exception &e) { + return Status::Error(e.what()); + } + } + + if (!already_in_cache && to_cache) { + Cache(); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + + server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time); + double physical_size = PhysicalSize(); + + server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size); + server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time)); + } + return Status::OK(); +} + +Status ExecutionEngineImpl::Merge(const std::string &location) { + if (location == location_) { + return Status::Error("Cannot Merge Self"); + } + ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_; + + auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location); + if (!to_merge) { + try { + to_merge = read_index(location); + } catch (knowhere::KnowhereException &e) { + ENGINE_LOG_ERROR << e.what(); + return Status::Error(e.what()); + } catch (std::exception &e) { + return Status::Error(e.what()); + } + } + + if (auto file_index = std::dynamic_pointer_cast(to_merge)) { + auto ec = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds()); + if (ec != server::KNOWHERE_SUCCESS) { + ENGINE_LOG_ERROR << "Merge: Add Error"; + return Status::Error("Merge: Add Error"); + } + return Status::OK(); + } else { + return Status::Error("file index type is not idmap"); + } +} + +ExecutionEnginePtr +ExecutionEngineImpl::BuildIndex(const std::string &location) { + ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_; + + auto from_index = std::dynamic_pointer_cast(index_); + auto to_index = CreatetVecIndex(build_type); + if (!to_index) { + throw Exception("Create Empty VecIndex"); + } + + Config build_cfg; + build_cfg["dim"] = Dimension(); + build_cfg["metric_type"] = GetMetricType(); + build_cfg["gpu_id"] = gpu_num; + build_cfg["nlist"] = nlist_; + AutoGenParams(to_index->GetType(), Count(), build_cfg); + + auto ec = to_index->BuildAll(Count(), + from_index->GetRawVectors(), + from_index->GetRawIds(), + build_cfg); + if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); } + + return std::make_shared(to_index, location, build_type); +} + +Status ExecutionEngineImpl::Search(long n, + const float *data, + long k, + float *distances, + long *labels) const { + ENGINE_LOG_DEBUG << "Search Params: [k] " << k << " [nprobe] " << nprobe_; + auto ec = index_->Search(n, data, distances, labels, Config::object{{"k", k}, {"nprobe", nprobe_}}); + if (ec != server::KNOWHERE_SUCCESS) { + ENGINE_LOG_ERROR << "Search error"; + return Status::Error("Search: Search Error"); + } + return Status::OK(); +} + +Status ExecutionEngineImpl::Cache() { + zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, index_); + + return Status::OK(); +} + +// TODO(linxj): remove. +Status ExecutionEngineImpl::Init() { + using namespace zilliz::milvus::server; + ServerConfig &config = ServerConfig::GetInstance(); + ConfigNode server_config = config.GetConfig(CONFIG_SERVER); + gpu_num = server_config.GetInt32Value("gpu_index", 0); + + switch (build_type) { + case EngineType::FAISS_IVFSQ8: + case EngineType::FAISS_IVFFLAT: { + ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE); + nprobe_ = engine_config.GetInt32Value(CONFIG_NPROBE, 1); + nlist_ = engine_config.GetInt32Value(CONFIG_NLIST, 16384); + break; + } + } + + return Status::OK(); +} + + +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/ExecutionEngineImpl.h b/cpp/src/db/ExecutionEngineImpl.h new file mode 100644 index 0000000000..12579d7c5d --- /dev/null +++ b/cpp/src/db/ExecutionEngineImpl.h @@ -0,0 +1,80 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "ExecutionEngine.h" +#include "wrapper/knowhere/vec_index.h" + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + + +class ExecutionEngineImpl : public ExecutionEngine { + public: + + ExecutionEngineImpl(uint16_t dimension, + const std::string &location, + EngineType type); + + ExecutionEngineImpl(VecIndexPtr index, + const std::string &location, + EngineType type); + + Status AddWithIds(long n, const float *xdata, const long *xids) override; + + size_t Count() const override; + + size_t Size() const override; + + size_t Dimension() const override; + + size_t PhysicalSize() const override; + + Status Serialize() override; + + Status Load(bool to_cache) override; + + Status Merge(const std::string &location) override; + + Status Search(long n, + const float *data, + long k, + float *distances, + long *labels) const override; + + ExecutionEnginePtr BuildIndex(const std::string &) override; + + Status Cache() override; + + Status Init() override; + + private: + VecIndexPtr CreatetVecIndex(EngineType type); + + VecIndexPtr Load(const std::string &location); + + protected: + VecIndexPtr index_ = nullptr; + EngineType build_type; + EngineType current_type; + + int64_t dim; + std::string location_; + + size_t nprobe_ = 0; + size_t nlist_ = 0; + int64_t gpu_num = 0; +}; + + +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index 442dca2974..22d4760b9b 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -77,10 +77,10 @@ std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOp std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); if (dialect.find("mysql") != std::string::npos) { ENGINE_LOG_INFO << "Using MySQL"; - return std::make_shared(meta::MySQLMetaImpl(metaOptions, mode)); + return std::make_shared(metaOptions, mode); } else if (dialect.find("sqlite") != std::string::npos) { ENGINE_LOG_INFO << "Using SQLite"; - return std::make_shared(meta::DBMetaImpl(metaOptions)); + return std::make_shared(metaOptions); } else { ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; throw InvalidArgumentException("URI dialect is not mysql / sqlite"); diff --git a/cpp/src/db/FaissExecutionEngine.cpp b/cpp/src/db/FaissExecutionEngine.cpp index 94f1299477..25e1160ee5 100644 --- a/cpp/src/db/FaissExecutionEngine.cpp +++ b/cpp/src/db/FaissExecutionEngine.cpp @@ -3,6 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#if 0 #include "FaissExecutionEngine.h" #include "Log.h" #include "utils/CommonUtil.h" @@ -218,3 +219,4 @@ Status FaissExecutionEngine::Init() { } // namespace engine } // namespace milvus } // namespace zilliz +#endif diff --git a/cpp/src/db/FaissExecutionEngine.h b/cpp/src/db/FaissExecutionEngine.h index 512743ca36..d566c1c9eb 100644 --- a/cpp/src/db/FaissExecutionEngine.h +++ b/cpp/src/db/FaissExecutionEngine.h @@ -5,6 +5,7 @@ ******************************************************************************/ #pragma once +#if 0 #include "ExecutionEngine.h" #include "faiss/Index.h" @@ -94,3 +95,4 @@ class FaissExecutionEngine : public ExecutionEngine { } // namespace engine } // namespace milvus } // namespace zilliz +#endif diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index 4dc20581df..607333a3d8 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -16,7 +16,7 @@ #include #include #include -#include + namespace zilliz { diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index 5275605611..7e826f6335 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -65,6 +65,9 @@ class Meta { virtual Status FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0; + virtual Status + FilesToSearch(const std::string &table_id, const std::vector &ids, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0; + virtual Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0; diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index 14879d81fe..12bfc55c1b 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -965,6 +965,117 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, return Status::OK(); } +Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) { + + + files.clear(); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + + if (connectionPtr == nullptr) { + return Status::Error("Failed to connect to database server"); + } + + Query filesToSearchQuery = connectionPtr->query(); + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id; + + if (!partition.empty()) { + std::stringstream partitionListSS; + for (auto &date : partition) { + partitionListSS << std::to_string(date) << ", "; + } + std::string partitionListStr = partitionListSS.str(); + + partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " + filesToSearchQuery << " AND " << "date IN (" << partitionListStr << ")"; + } + + if (!ids.empty()) { + std::stringstream idSS; + for (auto &id : ids) { + idSS << "id = " << std::to_string(id) << " OR "; + } + std::string idStr = idSS.str(); + idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR " + + filesToSearchQuery << " AND " << "(" << idStr << ")"; + + } + // End + filesToSearchQuery << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str(); + + res = filesToSearchQuery.store(); + } //Scoped Connection + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto &resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + utils::GetTableFilePath(options_, table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + } catch (const BadQuery &er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } catch (const Exception &er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } + + return Status::OK(); +} + Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) { diff --git a/cpp/src/db/MySQLMetaImpl.h b/cpp/src/db/MySQLMetaImpl.h index 87bc1783c7..7822b99f64 100644 --- a/cpp/src/db/MySQLMetaImpl.h +++ b/cpp/src/db/MySQLMetaImpl.h @@ -53,6 +53,11 @@ class MySQLMetaImpl : public Meta { const DatesT &partition, DatePartionedTableFilesSchema &files) override; + Status FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) override; + Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override; diff --git a/cpp/src/db/scheduler/task/SearchTask.cpp b/cpp/src/db/scheduler/task/SearchTask.cpp index e696faaed0..79baeeafe9 100644 --- a/cpp/src/db/scheduler/task/SearchTask.cpp +++ b/cpp/src/db/scheduler/task/SearchTask.cpp @@ -22,7 +22,7 @@ static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; bool NeedParallelReduce(uint64_t nq, uint64_t topk) { server::ServerConfig &config = server::ServerConfig::GetInstance(); server::ConfigNode& db_config = config.GetConfig(server::CONFIG_DB); - bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, true); + bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false); if(!need_parallel) { return false; } diff --git a/cpp/src/server/thrift_impl/RequestTask.cpp b/cpp/src/server/thrift_impl/RequestTask.cpp index 1d20f55bc6..29f1af86c1 100644 --- a/cpp/src/server/thrift_impl/RequestTask.cpp +++ b/cpp/src/server/thrift_impl/RequestTask.cpp @@ -310,7 +310,6 @@ ServerError HasTableTask::OnExecute() { if(res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + table_name_); } - //step 2: check table existence engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_); if(!stat.ok()) { diff --git a/cpp/src/utils/Error.h b/cpp/src/utils/Error.h index 3b5433e76f..1fdb8fc939 100644 --- a/cpp/src/utils/Error.h +++ b/cpp/src/utils/Error.h @@ -55,6 +55,12 @@ constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(5 constexpr ServerError DB_META_TRANSACTION_FAILED = ToGlobalServerErrorCode(1000); +using KnowhereError = int32_t; +constexpr KnowhereError KNOWHERE_SUCCESS = 0; +constexpr KnowhereError KNOWHERE_ERROR = ToGlobalServerErrorCode(1); +constexpr KnowhereError KNOWHERE_INVALID_ARGUMENT = ToGlobalServerErrorCode(2); +constexpr KnowhereError KNOWHERE_UNEXPECTED_ERROR = ToGlobalServerErrorCode(3); + class ServerException : public std::exception { public: ServerException(ServerError error_code, diff --git a/cpp/src/wrapper/Index.cpp b/cpp/src/wrapper/Index.cpp index 18e20d830a..4b10c1e686 100644 --- a/cpp/src/wrapper/Index.cpp +++ b/cpp/src/wrapper/Index.cpp @@ -4,6 +4,7 @@ // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#if 0 // TODO: maybe support static search #ifdef GPU_VERSION #include "faiss/gpu/GpuAutoTune.h" @@ -80,3 +81,4 @@ Index_ptr read_index(const std::string &file_name) { } } } +#endif diff --git a/cpp/src/wrapper/Index.h b/cpp/src/wrapper/Index.h index ba157348d4..9841416a6c 100644 --- a/cpp/src/wrapper/Index.h +++ b/cpp/src/wrapper/Index.h @@ -6,23 +6,29 @@ #pragma once -#include -#include -#include -#include -#include - +//#include +//#include +//#include +//#include +//#include +// #include "faiss/AutoTune.h" #include "faiss/index_io.h" +// +//#include "Operand.h" + +#include "knowhere/vec_index.h" -#include "Operand.h" namespace zilliz { namespace milvus { namespace engine { -class Index; -using Index_ptr = std::shared_ptr; +using Index_ptr = VecIndexPtr; + +#if 0 +//class Index; +//using Index_ptr = std::shared_ptr; class Index { typedef long idx_t; @@ -75,6 +81,7 @@ private: void write_index(const Index_ptr &index, const std::string &file_name); extern Index_ptr read_index(const std::string &file_name); +#endif } diff --git a/cpp/src/wrapper/IndexBuilder.cpp b/cpp/src/wrapper/IndexBuilder.cpp index 62781751e2..095341ecc7 100644 --- a/cpp/src/wrapper/IndexBuilder.cpp +++ b/cpp/src/wrapper/IndexBuilder.cpp @@ -4,6 +4,7 @@ // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#if 0 #include "mutex" @@ -131,10 +132,8 @@ Index_ptr BgCpuBuilder::build_all(const long &nb, const float *xb, const long *i return std::make_shared(index); } -// TODO: Be Factory pattern later IndexBuilderPtr GetIndexBuilder(const Operand_ptr &opd) { if (opd->index_type == "IDMap") { - // TODO: fix hardcode IndexBuilderPtr index = nullptr; return std::make_shared(opd); } @@ -145,3 +144,4 @@ IndexBuilderPtr GetIndexBuilder(const Operand_ptr &opd) { } } } +#endif diff --git a/cpp/src/wrapper/IndexBuilder.h b/cpp/src/wrapper/IndexBuilder.h index 8752063560..4cb6de814b 100644 --- a/cpp/src/wrapper/IndexBuilder.h +++ b/cpp/src/wrapper/IndexBuilder.h @@ -4,6 +4,7 @@ // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#if 0 #pragma once #include "faiss/Index.h" @@ -64,3 +65,4 @@ extern IndexBuilderPtr GetIndexBuilder(const Operand_ptr &opd); } } } +#endif diff --git a/cpp/src/wrapper/Operand.cpp b/cpp/src/wrapper/Operand.cpp index 49d007ba2a..8bc708eb72 100644 --- a/cpp/src/wrapper/Operand.cpp +++ b/cpp/src/wrapper/Operand.cpp @@ -1,10 +1,11 @@ + //////////////////////////////////////////////////////////////////////////////// // Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// -#include "src/server/ServerConfig.h" +#if 0 #include "Operand.h" @@ -119,3 +120,4 @@ Operand_ptr str_to_operand(const std::string &input) { } } } +#endif diff --git a/cpp/src/wrapper/Operand.h b/cpp/src/wrapper/Operand.h index 85a0eb8080..0e675f6a1b 100644 --- a/cpp/src/wrapper/Operand.h +++ b/cpp/src/wrapper/Operand.h @@ -4,6 +4,7 @@ // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#if 0 #pragma once #include @@ -42,3 +43,4 @@ extern Operand_ptr str_to_operand(const std::string &input); } } } +#endif diff --git a/cpp/src/wrapper/knowhere/data_transfer.cpp b/cpp/src/wrapper/knowhere/data_transfer.cpp new file mode 100644 index 0000000000..583a44ee29 --- /dev/null +++ b/cpp/src/wrapper/knowhere/data_transfer.cpp @@ -0,0 +1,48 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#include "data_transfer.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +using namespace zilliz::knowhere; + +DatasetPtr +GenDatasetWithIds(const int64_t &nb, const int64_t &dim, const float *xb, const long *ids) { + std::vector shape{nb, dim}; + auto tensor = ConstructFloatTensor((uint8_t *) xb, nb * dim * sizeof(float), shape); + std::vector tensors{tensor}; + std::vector tensor_fields{ConstructFloatField("data")}; + auto tensor_schema = std::make_shared(tensor_fields); + + auto id_array = ConstructInt64Array((uint8_t *) ids, nb * sizeof(int64_t)); + std::vector arrays{id_array}; + std::vector array_fields{ConstructInt64Field("id")}; + auto array_schema = std::make_shared(tensor_fields); + + auto dataset = std::make_shared(std::move(arrays), array_schema, + std::move(tensors), tensor_schema); + return dataset; +} + +DatasetPtr +GenDataset(const int64_t &nb, const int64_t &dim, const float *xb) { + std::vector shape{nb, dim}; + auto tensor = ConstructFloatTensor((uint8_t *) xb, nb * dim * sizeof(float), shape); + std::vector tensors{tensor}; + std::vector tensor_fields{ConstructFloatField("data")}; + auto tensor_schema = std::make_shared(tensor_fields); + + auto dataset = std::make_shared(std::move(tensors), tensor_schema); + return dataset; +} + +} +} +} diff --git a/cpp/src/wrapper/knowhere/data_transfer.h b/cpp/src/wrapper/knowhere/data_transfer.h new file mode 100644 index 0000000000..46de4ff21f --- /dev/null +++ b/cpp/src/wrapper/knowhere/data_transfer.h @@ -0,0 +1,24 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include "knowhere/adapter/structure.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +extern zilliz::knowhere::DatasetPtr +GenDatasetWithIds(const int64_t &nb, const int64_t &dim, const float *xb, const long *ids); + +extern zilliz::knowhere::DatasetPtr +GenDataset(const int64_t &nb, const int64_t &dim, const float *xb); + +} +} +} diff --git a/cpp/src/wrapper/knowhere/vec_impl.cpp b/cpp/src/wrapper/knowhere/vec_impl.cpp new file mode 100644 index 0000000000..7efbd54f0f --- /dev/null +++ b/cpp/src/wrapper/knowhere/vec_impl.cpp @@ -0,0 +1,234 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#include +#include "knowhere/index/vector_index/idmap.h" +#include "knowhere/index/vector_index/gpu_ivf.h" +#include "knowhere/common/exception.h" + +#include "vec_impl.h" +#include "data_transfer.h" +#include "wrapper_log.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +using namespace zilliz::knowhere; + +server::KnowhereError VecIndexImpl::BuildAll(const long &nb, + const float *xb, + const long *ids, + const Config &cfg, + const long &nt, + const float *xt) { + try { + dim = cfg["dim"].as(); + auto dataset = GenDatasetWithIds(nb, dim, xb, ids); + + auto preprocessor = index_->BuildPreprocessor(dataset, cfg); + index_->set_preprocessor(preprocessor); + auto model = index_->Train(dataset, cfg); + index_->set_index_model(model); + index_->Add(dataset, cfg); + } catch (KnowhereException &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_UNEXPECTED_ERROR; + } catch (jsoncons::json_exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_INVALID_ARGUMENT; + } catch (std::exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_ERROR; + } + return server::KNOWHERE_SUCCESS; +} + +server::KnowhereError VecIndexImpl::Add(const long &nb, const float *xb, const long *ids, const Config &cfg) { + try { + auto dataset = GenDatasetWithIds(nb, dim, xb, ids); + + index_->Add(dataset, cfg); + } catch (KnowhereException &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_UNEXPECTED_ERROR; + } catch (jsoncons::json_exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_INVALID_ARGUMENT; + } catch (std::exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_ERROR; + } + return server::KNOWHERE_SUCCESS; +} + +server::KnowhereError VecIndexImpl::Search(const long &nq, const float *xq, float *dist, long *ids, const Config &cfg) { + try { + auto k = cfg["k"].as(); + auto dataset = GenDataset(nq, dim, xq); + + Config search_cfg; + auto res = index_->Search(dataset, cfg); + auto ids_array = res->array()[0]; + auto dis_array = res->array()[1]; + + //{ + // auto& ids = ids_array; + // auto& dists = dis_array; + // std::stringstream ss_id; + // std::stringstream ss_dist; + // for (auto i = 0; i < 10; i++) { + // for (auto j = 0; j < k; ++j) { + // ss_id << *(ids->data()->GetValues(1, i * k + j)) << " "; + // ss_dist << *(dists->data()->GetValues(1, i * k + j)) << " "; + // } + // ss_id << std::endl; + // ss_dist << std::endl; + // } + // std::cout << "id\n" << ss_id.str() << std::endl; + // std::cout << "dist\n" << ss_dist.str() << std::endl; + //} + + auto p_ids = ids_array->data()->GetValues(1, 0); + auto p_dist = dis_array->data()->GetValues(1, 0); + + // TODO(linxj): avoid copy here. + memcpy(ids, p_ids, sizeof(int64_t) * nq * k); + memcpy(dist, p_dist, sizeof(float) * nq * k); + } catch (KnowhereException &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_UNEXPECTED_ERROR; + } catch (jsoncons::json_exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_INVALID_ARGUMENT; + } catch (std::exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_ERROR; + } + return server::KNOWHERE_SUCCESS; +} + +zilliz::knowhere::BinarySet VecIndexImpl::Serialize() { + return index_->Serialize(); +} + +server::KnowhereError VecIndexImpl::Load(const zilliz::knowhere::BinarySet &index_binary) { + index_->Load(index_binary); + dim = Dimension(); + return server::KNOWHERE_SUCCESS; +} + +int64_t VecIndexImpl::Dimension() { + return index_->Dimension(); +} + +int64_t VecIndexImpl::Count() { + return index_->Count(); +} + +IndexType VecIndexImpl::GetType() { + return type; +} + +float *BFIndex::GetRawVectors() { + auto raw_index = std::dynamic_pointer_cast(index_); + if (raw_index) { return raw_index->GetRawVectors(); } + return nullptr; +} + +int64_t *BFIndex::GetRawIds() { + return std::static_pointer_cast(index_)->GetRawIds(); +} + +server::KnowhereError BFIndex::Build(const Config &cfg) { + try { + dim = cfg["dim"].as(); + std::static_pointer_cast(index_)->Train(cfg); + } catch (KnowhereException &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_UNEXPECTED_ERROR; + } catch (jsoncons::json_exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_INVALID_ARGUMENT; + } catch (std::exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_ERROR; + } + return server::KNOWHERE_SUCCESS; +} + +server::KnowhereError BFIndex::BuildAll(const long &nb, + const float *xb, + const long *ids, + const Config &cfg, + const long &nt, + const float *xt) { + try { + dim = cfg["dim"].as(); + auto dataset = GenDatasetWithIds(nb, dim, xb, ids); + + std::static_pointer_cast(index_)->Train(cfg); + index_->Add(dataset, cfg); + } catch (KnowhereException &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_UNEXPECTED_ERROR; + } catch (jsoncons::json_exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_INVALID_ARGUMENT; + } catch (std::exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_ERROR; + } + return server::KNOWHERE_SUCCESS; +} + +// TODO(linxj): add lock here. +server::KnowhereError IVFMixIndex::BuildAll(const long &nb, + const float *xb, + const long *ids, + const Config &cfg, + const long &nt, + const float *xt) { + try { + dim = cfg["dim"].as(); + auto dataset = GenDatasetWithIds(nb, dim, xb, ids); + + auto preprocessor = index_->BuildPreprocessor(dataset, cfg); + index_->set_preprocessor(preprocessor); + auto model = index_->Train(dataset, cfg); + index_->set_index_model(model); + index_->Add(dataset, cfg); + + if (auto device_index = std::dynamic_pointer_cast(index_)) { + auto host_index = device_index->Copy_index_gpu_to_cpu(); + index_ = host_index; + } else { + WRAPPER_LOG_ERROR << "Build IVFMIXIndex Failed"; + } + } catch (KnowhereException &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_UNEXPECTED_ERROR; + } catch (jsoncons::json_exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_INVALID_ARGUMENT; + } catch (std::exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_ERROR; + } + return server::KNOWHERE_SUCCESS; +} + +server::KnowhereError IVFMixIndex::Load(const zilliz::knowhere::BinarySet &index_binary) { + index_ = std::make_shared(); + index_->Load(index_binary); + dim = Dimension(); + return server::KNOWHERE_SUCCESS; +} + +} +} +} diff --git a/cpp/src/wrapper/knowhere/vec_impl.h b/cpp/src/wrapper/knowhere/vec_impl.h new file mode 100644 index 0000000000..c4a0e2ac61 --- /dev/null +++ b/cpp/src/wrapper/knowhere/vec_impl.h @@ -0,0 +1,73 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include "knowhere/index/vector_index/vector_index.h" + +#include "vec_index.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class VecIndexImpl : public VecIndex { + public: + explicit VecIndexImpl(std::shared_ptr index, const IndexType &type) + : index_(std::move(index)), type(type) {}; + server::KnowhereError BuildAll(const long &nb, + const float *xb, + const long *ids, + const Config &cfg, + const long &nt, + const float *xt) override; + IndexType GetType() override; + int64_t Dimension() override; + int64_t Count() override; + server::KnowhereError Add(const long &nb, const float *xb, const long *ids, const Config &cfg) override; + zilliz::knowhere::BinarySet Serialize() override; + server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) override; + server::KnowhereError Search(const long &nq, const float *xq, float *dist, long *ids, const Config &cfg) override; + + protected: + int64_t dim = 0; + IndexType type = IndexType::INVALID; + std::shared_ptr index_ = nullptr; +}; + +class IVFMixIndex : public VecIndexImpl { + public: + explicit IVFMixIndex(std::shared_ptr index, const IndexType &type) + : VecIndexImpl(std::move(index), type) {}; + + server::KnowhereError BuildAll(const long &nb, + const float *xb, + const long *ids, + const Config &cfg, + const long &nt, + const float *xt) override; + server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) override; +}; + +class BFIndex : public VecIndexImpl { + public: + explicit BFIndex(std::shared_ptr index) : VecIndexImpl(std::move(index), + IndexType::FAISS_IDMAP) {}; + server::KnowhereError Build(const Config& cfg); + float *GetRawVectors(); + server::KnowhereError BuildAll(const long &nb, + const float *xb, + const long *ids, + const Config &cfg, + const long &nt, + const float *xt) override; + int64_t *GetRawIds(); +}; + +} +} +} diff --git a/cpp/src/wrapper/knowhere/vec_index.cpp b/cpp/src/wrapper/knowhere/vec_index.cpp new file mode 100644 index 0000000000..65364eb01f --- /dev/null +++ b/cpp/src/wrapper/knowhere/vec_index.cpp @@ -0,0 +1,207 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include "knowhere/index/vector_index/ivf.h" +#include "knowhere/index/vector_index/idmap.h" +#include "knowhere/index/vector_index/gpu_ivf.h" +#include "knowhere/index/vector_index/cpu_kdt_rng.h" +#include "knowhere/common/exception.h" + +#include "vec_index.h" +#include "vec_impl.h" +#include "wrapper_log.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +struct FileIOWriter { + std::fstream fs; + std::string name; + + FileIOWriter(const std::string &fname); + ~FileIOWriter(); + size_t operator()(void *ptr, size_t size); +}; + +struct FileIOReader { + std::fstream fs; + std::string name; + + FileIOReader(const std::string &fname); + ~FileIOReader(); + size_t operator()(void *ptr, size_t size); + size_t operator()(void *ptr, size_t size, size_t pos); +}; + +FileIOReader::FileIOReader(const std::string &fname) { + name = fname; + fs = std::fstream(name, std::ios::in | std::ios::binary); +} + +FileIOReader::~FileIOReader() { + fs.close(); +} + +size_t FileIOReader::operator()(void *ptr, size_t size) { + fs.read(reinterpret_cast(ptr), size); +} + +size_t FileIOReader::operator()(void *ptr, size_t size, size_t pos) { + return 0; +} + +FileIOWriter::FileIOWriter(const std::string &fname) { + name = fname; + fs = std::fstream(name, std::ios::out | std::ios::binary); +} + +FileIOWriter::~FileIOWriter() { + fs.close(); +} + +size_t FileIOWriter::operator()(void *ptr, size_t size) { + fs.write(reinterpret_cast(ptr), size); +} + + +VecIndexPtr GetVecIndexFactory(const IndexType &type) { + std::shared_ptr index; + switch (type) { + case IndexType::FAISS_IDMAP: { + index = std::make_shared(); + return std::make_shared(index); + } + case IndexType::FAISS_IVFFLAT_CPU: { + index = std::make_shared(); + break; + } + case IndexType::FAISS_IVFFLAT_GPU: { + index = std::make_shared(0); + break; + } + case IndexType::FAISS_IVFFLAT_MIX: { + index = std::make_shared(0); + return std::make_shared(index, IndexType::FAISS_IVFFLAT_MIX); + } + case IndexType::FAISS_IVFPQ_CPU: { + index = std::make_shared(); + break; + } + case IndexType::FAISS_IVFPQ_GPU: { + index = std::make_shared(0); + break; + } + case IndexType::SPTAG_KDT_RNT_CPU: { + index = std::make_shared(); + break; + } + case IndexType::FAISS_IVFSQ8_MIX: { + index = std::make_shared(0); + return std::make_shared(index, IndexType::FAISS_IVFSQ8_MIX); + } + //case IndexType::NSG: { // TODO(linxj): bug. + // index = std::make_shared(); + // break; + //} + default: { + return nullptr; + } + } + return std::make_shared(index, type); +} + +VecIndexPtr LoadVecIndex(const IndexType &index_type, const zilliz::knowhere::BinarySet &index_binary) { + auto index = GetVecIndexFactory(index_type); + index->Load(index_binary); + return index; +} + +VecIndexPtr read_index(const std::string &location) { + knowhere::BinarySet load_data_list; + FileIOReader reader(location); + reader.fs.seekg(0, reader.fs.end); + size_t length = reader.fs.tellg(); + reader.fs.seekg(0); + + size_t rp = 0; + auto current_type = IndexType::INVALID; + reader(¤t_type, sizeof(current_type)); + rp += sizeof(current_type); + while (rp < length) { + size_t meta_length; + reader(&meta_length, sizeof(meta_length)); + rp += sizeof(meta_length); + reader.fs.seekg(rp); + + auto meta = new char[meta_length]; + reader(meta, meta_length); + rp += meta_length; + reader.fs.seekg(rp); + + size_t bin_length; + reader(&bin_length, sizeof(bin_length)); + rp += sizeof(bin_length); + reader.fs.seekg(rp); + + auto bin = new uint8_t[bin_length]; + reader(bin, bin_length); + rp += bin_length; + + auto binptr = std::make_shared(); + binptr.reset(bin); + load_data_list.Append(std::string(meta, meta_length), binptr, bin_length); + } + + return LoadVecIndex(current_type, load_data_list); +} + +server::KnowhereError write_index(VecIndexPtr index, const std::string &location) { + try { + auto binaryset = index->Serialize(); + auto index_type = index->GetType(); + + FileIOWriter writer(location); + writer(&index_type, sizeof(IndexType)); + for (auto &iter: binaryset.binary_map_) { + auto meta = iter.first.c_str(); + size_t meta_length = iter.first.length(); + writer(&meta_length, sizeof(meta_length)); + writer((void *) meta, meta_length); + + auto binary = iter.second; + int64_t binary_length = binary->size; + writer(&binary_length, sizeof(binary_length)); + writer((void *) binary->data.get(), binary_length); + } + } catch (knowhere::KnowhereException &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_UNEXPECTED_ERROR; + } catch (std::exception &e) { + WRAPPER_LOG_ERROR << e.what(); + return server::KNOWHERE_ERROR; + } + return server::KNOWHERE_SUCCESS; +} + + +// TODO(linxj): redo here. +void AutoGenParams(const IndexType &type, const long &size, zilliz::knowhere::Config &cfg) { + if (!cfg.contains("nlist")) { cfg["nlist"] = int(size / 1000000.0 * 16384); } + if (!cfg.contains("gpu_id")) { cfg["gpu_id"] = int(0); } + if (!cfg.contains("metric_type")) { cfg["metric_type"] = "L2"; } + + switch (type) { + case IndexType::FAISS_IVFSQ8_MIX: { + if (!cfg.contains("nbits")) { cfg["nbits"] = int(8); } + break; + } + } +} + +} +} +} diff --git a/cpp/src/wrapper/knowhere/vec_index.h b/cpp/src/wrapper/knowhere/vec_index.h new file mode 100644 index 0000000000..ed1451bb04 --- /dev/null +++ b/cpp/src/wrapper/knowhere/vec_index.h @@ -0,0 +1,83 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include + +#include "utils/Error.h" + +#include "knowhere/common/config.h" +#include "knowhere/common/binary_set.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +// TODO(linxj): jsoncons => rapidjson or other. +using Config = zilliz::knowhere::Config; + +enum class IndexType { + INVALID = 0, + FAISS_IDMAP = 1, + FAISS_IVFFLAT_CPU, + FAISS_IVFFLAT_GPU, + FAISS_IVFFLAT_MIX, // build on gpu and search on cpu + FAISS_IVFPQ_CPU, + FAISS_IVFPQ_GPU, + SPTAG_KDT_RNT_CPU, + FAISS_IVFSQ8_MIX, + //NSG, +}; + +class VecIndex { + public: + virtual server::KnowhereError BuildAll(const long &nb, + const float *xb, + const long *ids, + const Config &cfg, + const long &nt = 0, + const float *xt = nullptr) = 0; + + virtual server::KnowhereError Add(const long &nb, + const float *xb, + const long *ids, + const Config &cfg = Config()) = 0; + + virtual server::KnowhereError Search(const long &nq, + const float *xq, + float *dist, + long *ids, + const Config &cfg = Config()) = 0; + + virtual IndexType GetType() = 0; + + virtual int64_t Dimension() = 0; + + virtual int64_t Count() = 0; + + virtual zilliz::knowhere::BinarySet Serialize() = 0; + + virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) = 0; +}; + +using VecIndexPtr = std::shared_ptr; + +extern server::KnowhereError write_index(VecIndexPtr index, const std::string &location); + +extern VecIndexPtr read_index(const std::string &location); + +extern VecIndexPtr GetVecIndexFactory(const IndexType &type); + +extern VecIndexPtr LoadVecIndex(const IndexType &index_type, const zilliz::knowhere::BinarySet &index_binary); + +extern void AutoGenParams(const IndexType& type, const long& size, Config& cfg); + +} +} +} diff --git a/cpp/src/wrapper/knowhere/wrapper_log.h b/cpp/src/wrapper/knowhere/wrapper_log.h new file mode 100644 index 0000000000..39ca78092b --- /dev/null +++ b/cpp/src/wrapper/knowhere/wrapper_log.h @@ -0,0 +1,28 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +#define WRAPPER_DOMAIN_NAME "[WRAPPER] " +#define WRAPPER_ERROR_TEXT "WRAPPER Error:" + +#define WRAPPER_LOG_TRACE LOG(TRACE) << WRAPPER_DOMAIN_NAME +#define WRAPPER_LOG_DEBUG LOG(DEBUG) << WRAPPER_DOMAIN_NAME +#define WRAPPER_LOG_INFO LOG(INFO) << WRAPPER_DOMAIN_NAME +#define WRAPPER_LOG_WARNING LOG(WARNING) << WRAPPER_DOMAIN_NAME +#define WRAPPER_LOG_ERROR LOG(ERROR) << WRAPPER_DOMAIN_NAME +#define WRAPPER_LOG_FATAL LOG(FATAL) << WRAPPER_DOMAIN_NAME + +} +} +} + diff --git a/cpp/thirdparty/knowhere b/cpp/thirdparty/knowhere new file mode 160000 index 0000000000..02550a43b5 --- /dev/null +++ b/cpp/thirdparty/knowhere @@ -0,0 +1 @@ +Subproject commit 02550a43b5146bd7976b8b2b3fc37ca885d1e880 diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index f4a670acf3..2b100448f9 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -40,7 +40,8 @@ set(unittest_libs add_subdirectory(server) add_subdirectory(db) -add_subdirectory(faiss_wrapper) +add_subdirectory(index_wrapper) +#add_subdirectory(faiss_wrapper) #add_subdirectory(license) add_subdirectory(metrics) add_subdirectory(storage) diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 75d7bf037e..50847b9f29 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -6,7 +6,8 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) -aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +#aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src) aux_source_directory(./ test_srcs) set(util_files @@ -26,22 +27,47 @@ link_directories("/usr/local/cuda/lib64") include_directories(/usr/include/mysql) +#add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY) + set(db_test_src ${config_files} ${cache_srcs} ${db_srcs} ${db_scheduler_srcs} ${wrapper_src} + ${knowhere_src} ${util_files} ${require_files} - ${test_srcs}) + ${test_srcs} + ) cuda_add_executable(db_test ${db_test_src}) set(db_libs + sqlite + boost_system_static + boost_filesystem_static + lz4 + mysqlpp + ) + +set(knowhere_libs + knowhere + SPTAGLibStatic + arrow + jemalloc_pic faiss + openblas + lapack + tbb cudart cublas + ) + +target_link_libraries(db_test + ${knowhere_libs} + ${db_libs} + ${unittest_libs} sqlite boost_system_static boost_filesystem_static diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index bf6319ab90..588e38680c 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -217,7 +217,7 @@ TEST_F(DBTest, SEARCH_TEST) { {//search by specify index file engine::meta::DatesT dates; - std::vector file_ids = {"1", "2", "3", "4"}; + std::vector file_ids = {"4", "5", "6"}; engine::QueryResults results; stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results); ASSERT_STATS(stat); diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index 5bce4058b1..36643ead94 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -113,7 +113,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { ss << "days:" << days_num; options.archive_conf = ArchiveConf("delete", ss.str()); - auto impl = meta::DBMetaImpl(options); + meta::DBMetaImpl impl(options); auto table_id = "meta_test_table"; meta::TableSchema table; @@ -163,7 +163,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { options.path = "/tmp/milvus_test"; options.archive_conf = ArchiveConf("delete", "disk:11"); - auto impl = meta::DBMetaImpl(options); + meta::DBMetaImpl impl(options); auto table_id = "meta_test_group"; meta::TableSchema table; @@ -269,4 +269,15 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(dated_files[table_file.date_].size(), to_index_files_cnt+raw_files_cnt+index_files_cnt); + + std::vector ids; + status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(), + to_index_files_cnt+raw_files_cnt+index_files_cnt); + + ids.push_back(size_t(9999999999)); + status = impl_->FilesToSearch(table_id, ids, dates, dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(),0); } diff --git a/cpp/unittest/db/misc_test.cpp b/cpp/unittest/db/misc_test.cpp index 9dd07fac8d..3ec54751ab 100644 --- a/cpp/unittest/db/misc_test.cpp +++ b/cpp/unittest/db/misc_test.cpp @@ -27,32 +27,32 @@ namespace { } -TEST(DBMiscTest, ENGINE_API_TEST) { - //engine api AddWithIdArray - const uint16_t dim = 512; - const long n = 10; - engine::FaissExecutionEngine engine(512, "/tmp/1", "IDMap", "IDMap,Flat"); - std::vector vectors; - std::vector ids; - for (long i = 0; i < n; i++) { - for (uint16_t k = 0; k < dim; k++) { - vectors.push_back((float) k); - } - ids.push_back(i); - } - - auto status = engine.AddWithIdArray(vectors, ids); - ASSERT_TRUE(status.ok()); - - auto engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::INVALID); - ASSERT_EQ(engine_ptr, nullptr); - - engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IVFFLAT); - ASSERT_NE(engine_ptr, nullptr); - - engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IDMAP); - ASSERT_NE(engine_ptr, nullptr); -} +//TEST(DBMiscTest, ENGINE_API_TEST) { +// //engine api AddWithIdArray +// const uint16_t dim = 512; +// const long n = 10; +// engine::FaissExecutionEngine engine(512, "/tmp/1", "IDMap", "IDMap,Flat"); +// std::vector vectors; +// std::vector ids; +// for (long i = 0; i < n; i++) { +// for (uint16_t k = 0; k < dim; k++) { +// vectors.push_back((float) k); +// } +// ids.push_back(i); +// } +// +// auto status = engine.AddWithIdArray(vectors, ids); +// ASSERT_TRUE(status.ok()); +// +// auto engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::INVALID); +// ASSERT_EQ(engine_ptr, nullptr); +// +// engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IVFFLAT_GPU); +// ASSERT_NE(engine_ptr, nullptr); +// +// engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IDMAP); +// ASSERT_NE(engine_ptr, nullptr); +//} TEST(DBMiscTest, EXCEPTION_TEST) { engine::Exception ex1(""); diff --git a/cpp/unittest/db/mysql_meta_test.cpp b/cpp/unittest/db/mysql_meta_test.cpp index 76d7846362..aead509a2c 100644 --- a/cpp/unittest/db/mysql_meta_test.cpp +++ b/cpp/unittest/db/mysql_meta_test.cpp @@ -328,6 +328,17 @@ TEST_F(MySQLTest, TABLE_FILES_TEST) { ASSERT_EQ(dated_files[table_file.date_].size(), to_index_files_cnt+raw_files_cnt+index_files_cnt); + std::vector ids; + status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(), + to_index_files_cnt+raw_files_cnt+index_files_cnt); + + ids.push_back(size_t(9999999999)); + status = impl.FilesToSearch(table_id, ids, dates, dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(),0); + status = impl.DropAll(); ASSERT_TRUE(status.ok()); } diff --git a/cpp/unittest/db/search_test.cpp b/cpp/unittest/db/search_test.cpp index 98f2c88ea0..340fa82f20 100644 --- a/cpp/unittest/db/search_test.cpp +++ b/cpp/unittest/db/search_test.cpp @@ -3,11 +3,10 @@ // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// -#include - #include "db/scheduler/task/SearchTask.h" #include "utils/TimeRecorder.h" +#include #include #include diff --git a/cpp/unittest/index_wrapper/CMakeLists.txt b/cpp/unittest/index_wrapper/CMakeLists.txt new file mode 100644 index 0000000000..1f02464ad9 --- /dev/null +++ b/cpp/unittest/index_wrapper/CMakeLists.txt @@ -0,0 +1,25 @@ +include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include") +link_directories("${CUDA_TOOLKIT_ROOT_DIR}/lib64") + +aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src) + +set(helper + utils.cpp) + +set(knowhere_libs + knowhere + SPTAGLibStatic + arrow + jemalloc_pic + faiss + openblas + lapack + tbb + cudart + cublas + ) + +add_executable(knowhere_test knowhere_test.cpp ${knowhere_src} ${helper}) +target_link_libraries(knowhere_test ${knowhere_libs} ${unittest_libs}) + +install(TARGETS knowhere_test DESTINATION bin) \ No newline at end of file diff --git a/cpp/unittest/index_wrapper/knowhere_test.cpp b/cpp/unittest/index_wrapper/knowhere_test.cpp new file mode 100644 index 0000000000..064d6dc911 --- /dev/null +++ b/cpp/unittest/index_wrapper/knowhere_test.cpp @@ -0,0 +1,174 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#include +#include + +#include + +#include "utils.h" + +INITIALIZE_EASYLOGGINGPP + +using namespace zilliz::milvus::engine; +using namespace zilliz::knowhere; + +using ::testing::TestWithParam; +using ::testing::Values; +using ::testing::Combine; + + +class KnowhereWrapperTest + : public TestWithParam<::std::tuple> { + protected: + void SetUp() override { + std::string generator_type; + std::tie(index_type, generator_type, dim, nb, nq, k, train_cfg, search_cfg) = GetParam(); + + //auto generator = GetGenerateFactory(generator_type); + auto generator = std::make_shared(); + generator->GenData(dim, nb, nq, xb, xq, ids, k, gt_ids, gt_dis); + + index_ = GetVecIndexFactory(index_type); + } + + void AssertResult(const std::vector &ids, const std::vector &dis) { + EXPECT_EQ(ids.size(), nq * k); + EXPECT_EQ(dis.size(), nq * k); + + for (auto i = 0; i < nq; i++) { + EXPECT_EQ(ids[i * k], gt_ids[i * k]); + //EXPECT_EQ(dis[i * k], gt_dis[i * k]); + } + + int match = 0; + for (int i = 0; i < nq; ++i) { + for (int j = 0; j < k; ++j) { + for (int l = 0; l < k; ++l) { + if (ids[i * nq + j] == gt_ids[i * nq + l]) match++; + } + } + } + + auto precision = float(match) / (nq * k); + EXPECT_GT(precision, 0.5); + std::cout << std::endl << "Precision: " << precision + << ", match: " << match + << ", total: " << nq * k + << std::endl; + } + + protected: + IndexType index_type; + Config train_cfg; + Config search_cfg; + + int dim = 64; + int nb = 10000; + int nq = 10; + int k = 10; + std::vector xb; + std::vector xq; + std::vector ids; + + VecIndexPtr index_ = nullptr; + + // Ground Truth + std::vector gt_ids; + std::vector gt_dis; +}; + +INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest, + Values( + //["Index type", "Generator type", "dim", "nb", "nq", "k", "build config", "search config"] + //std::make_tuple(IndexType::FAISS_IVFFLAT_CPU, "Default", + // 64, 100000, 10, 10, + // Config::object{{"nlist", 100}, {"dim", 64}}, + // Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 10}} + //), + //std::make_tuple(IndexType::FAISS_IVFFLAT_GPU, "Default", + // 64, 10000, 10, 10, + // Config::object{{"nlist", 100}, {"dim", 64}}, + // Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 40}} + //), + std::make_tuple(IndexType::FAISS_IVFFLAT_MIX, "Default", + 64, 100000, 10, 10, + Config::object{{"nlist", 1000}, {"dim", 64}, {"metric_type", "L2"}}, + Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}} + ), + std::make_tuple(IndexType::FAISS_IDMAP, "Default", + 64, 100000, 10, 10, + Config::object{{"dim", 64}, {"metric_type", "L2"}}, + Config::object{{"dim", 64}, {"k", 10}} + ), + std::make_tuple(IndexType::FAISS_IVFSQ8_MIX, "Default", + 64, 100000, 10, 10, + Config::object{{"dim", 64}, {"nlist", 1000}, {"nbits", 8}, {"metric_type", "L2"}}, + Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}} + ) + //std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default", + // 64, 10000, 10, 10, + // Config::object{{"TPTNumber", 1}, {"dim", 64}}, + // Config::object{{"dim", 64}, {"k", 10}} + //) + ) +); + +TEST_P(KnowhereWrapperTest, base_test) { + EXPECT_EQ(index_->GetType(), index_type); + + auto elems = nq * k; + std::vector res_ids(elems); + std::vector res_dis(elems); + + index_->BuildAll(nb, xb.data(), ids.data(), train_cfg); + index_->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg); + AssertResult(res_ids, res_dis); +} + +TEST_P(KnowhereWrapperTest, serialize) { + EXPECT_EQ(index_->GetType(), index_type); + + auto elems = nq * k; + std::vector res_ids(elems); + std::vector res_dis(elems); + index_->BuildAll(nb, xb.data(), ids.data(), train_cfg); + index_->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg); + AssertResult(res_ids, res_dis); + + { + auto binary = index_->Serialize(); + auto type = index_->GetType(); + auto new_index = GetVecIndexFactory(type); + new_index->Load(binary); + EXPECT_EQ(new_index->Dimension(), index_->Dimension()); + EXPECT_EQ(new_index->Count(), index_->Count()); + + std::vector res_ids(elems); + std::vector res_dis(elems); + new_index->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg); + AssertResult(res_ids, res_dis); + } + + { + std::string file_location = "/tmp/whatever"; + write_index(index_, file_location); + auto new_index = read_index(file_location); + EXPECT_EQ(new_index->GetType(), index_type); + EXPECT_EQ(new_index->Dimension(), index_->Dimension()); + EXPECT_EQ(new_index->Count(), index_->Count()); + + std::vector res_ids(elems); + std::vector res_dis(elems); + new_index->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg); + AssertResult(res_ids, res_dis); + } +} + +// TODO(linxj): add exception test +//TEST_P(KnowhereWrapperTest, exception_test) { +//} + diff --git a/cpp/unittest/index_wrapper/utils.cpp b/cpp/unittest/index_wrapper/utils.cpp new file mode 100644 index 0000000000..ede5dd0485 --- /dev/null +++ b/cpp/unittest/index_wrapper/utils.cpp @@ -0,0 +1,56 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#include + +#include "utils.h" + + +DataGenPtr GetGenerateFactory(const std::string &gen_type) { + std::shared_ptr generator; + if (gen_type == "default") { + generator = std::make_shared(); + } + return generator; +} + +void DataGenBase::GenData(const int &dim, const int &nb, const int &nq, + float *xb, float *xq, long *ids, + const int &k, long *gt_ids, float *gt_dis) { + for (auto i = 0; i < nb; ++i) { + for (auto j = 0; j < dim; ++j) { + //p_data[i * d + j] = float(base + i); + xb[i * dim + j] = drand48(); + } + xb[dim * i] += i / 1000.; + ids[i] = i; + } + for (size_t i = 0; i < nq * dim; ++i) { + xq[i] = xb[i]; + } + + faiss::IndexFlatL2 index(dim); + //index.add_with_ids(nb, xb, ids); + index.add(nb, xb); + index.search(nq, xq, k, gt_dis, gt_ids); +} + +void DataGenBase::GenData(const int &dim, + const int &nb, + const int &nq, + std::vector &xb, + std::vector &xq, + std::vector &ids, + const int &k, + std::vector >_ids, + std::vector >_dis) { + xb.resize(nb * dim); + xq.resize(nq * dim); + ids.resize(nb); + gt_ids.resize(nq * k); + gt_dis.resize(nq * k); + GenData(dim, nb, nq, xb.data(), xq.data(), ids.data(), k, gt_ids.data(), gt_dis.data()); +} diff --git a/cpp/unittest/index_wrapper/utils.h b/cpp/unittest/index_wrapper/utils.h new file mode 100644 index 0000000000..ce3c428d68 --- /dev/null +++ b/cpp/unittest/index_wrapper/utils.h @@ -0,0 +1,45 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +#include +#include +#include + + +class DataGenBase; + +using DataGenPtr = std::shared_ptr; + +extern DataGenPtr GetGenerateFactory(const std::string &gen_type); + + +class DataGenBase { + public: + virtual void GenData(const int &dim, const int &nb, const int &nq, float *xb, float *xq, long *ids, + const int &k, long *gt_ids, float *gt_dis); + + virtual void GenData(const int &dim, + const int &nb, + const int &nq, + std::vector &xb, + std::vector &xq, + std::vector &ids, + const int &k, + std::vector >_ids, + std::vector >_dis); +}; + + +//class SanityCheck : public DataGenBase { +// public: +// void GenData(const int &dim, const int &nb, const int &nq, float *xb, float *xq, long *ids, +// const int &k, long *gt_ids, float *gt_dis) override; +//}; + diff --git a/cpp/unittest/metrics/CMakeLists.txt b/cpp/unittest/metrics/CMakeLists.txt index 0666228382..02fe3009b9 100644 --- a/cpp/unittest/metrics/CMakeLists.txt +++ b/cpp/unittest/metrics/CMakeLists.txt @@ -10,11 +10,14 @@ include_directories(../../src) -aux_source_directory(../../src/db db_srcs) -aux_source_directory(../../src/config config_files) -aux_source_directory(../../src/cache cache_srcs) -aux_source_directory(../../src/wrapper wrapper_src) -aux_source_directory(../../src/metrics metrics_src) + + +aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/src/metrics metrics_src) aux_source_directory(./ test_srcs) set(util_files @@ -44,16 +47,27 @@ set(count_test_src ${db_srcs} ${db_scheduler_srcs} ${wrapper_src} + ${knowhere_src} ${metrics_src} ${test_srcs} ${util_files} ) - add_executable(metrics_test ${count_test_src} ${require_files} ) -target_link_libraries(metrics_test +set(knowhere_libs + knowhere + SPTAGLibStatic + arrow + jemalloc_pic faiss + openblas + lapack + tbb + ) + +target_link_libraries(metrics_test + ${knowhere_libs} cudart cublas sqlite diff --git a/cpp/unittest/server/CMakeLists.txt b/cpp/unittest/server/CMakeLists.txt index 7c46da68f7..c60e2dbe2c 100644 --- a/cpp/unittest/server/CMakeLists.txt +++ b/cpp/unittest/server/CMakeLists.txt @@ -33,8 +33,11 @@ cuda_add_executable(server_test ) set(require_libs - stdc++ + knowhere faiss + openblas + lapack + stdc++ cudart cublas sqlite diff --git a/cpp/unittest/server/cache_test.cpp b/cpp/unittest/server/cache_test.cpp index 8c323d81d3..d45deb7475 100644 --- a/cpp/unittest/server/cache_test.cpp +++ b/cpp/unittest/server/cache_test.cpp @@ -7,7 +7,9 @@ #include "cache/CpuCacheMgr.h" #include "cache/GpuCacheMgr.h" +#include "utils/Error.h" #include "wrapper/Index.h" +#include "wrapper/knowhere/vec_index.h" using namespace zilliz::milvus; @@ -26,6 +28,58 @@ public: } }; +class MockVecIndex : public engine::VecIndex { +public: + virtual server::KnowhereError BuildAll(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg, + const long &nt = 0, + const float *xt = nullptr) { + + } + + engine::IndexType GetType() override { + return engine::IndexType::INVALID; + } + + virtual server::KnowhereError Add(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + virtual server::KnowhereError Search(const long &nq, + const float *xq, + float *dist, + long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + virtual int64_t Dimension() { + return dimension_; + } + + virtual int64_t Count() { + return ntotal_; + } + + virtual zilliz::knowhere::BinarySet Serialize() { + zilliz::knowhere::BinarySet binset; + return binset; + } + + virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) { + + } + +public: + int64_t dimension_ = 512; + int64_t ntotal_ = 0; +}; + } TEST(CacheTest, CPU_CACHE_TEST) { @@ -40,9 +94,9 @@ TEST(CacheTest, CPU_CACHE_TEST) { const int dim = 256; for (int i = 0; i < 20; i++) { - std::shared_ptr raw_index(faiss::index_factory(dim, "IDMap,Flat")); - engine::Index_ptr index = std::make_shared(raw_index); - index->ntotal = 1000000;//less 1G per index + MockVecIndex* mock_index = new MockVecIndex(); + mock_index->ntotal_ = 1000000;//less 1G per index + engine::Index_ptr index(mock_index); cpu_mgr->InsertItem("index_" + std::to_string(i), index); } @@ -65,9 +119,9 @@ TEST(CacheTest, CPU_CACHE_TEST) { g_num = 5; cpu_mgr->SetCapacity(g_num * gbyte); - std::shared_ptr raw_index(faiss::index_factory(dim, "IDMap,Flat")); - engine::Index_ptr index = std::make_shared(raw_index); - index->ntotal = 6000000;//6G less + MockVecIndex* mock_index = new MockVecIndex(); + mock_index->ntotal_ = 6000000;//6G less + engine::Index_ptr index(mock_index); cpu_mgr->InsertItem("index_6g", index); ASSERT_EQ(cpu_mgr->ItemCount(), 0);//data greater than capacity can not be inserted sucessfully @@ -82,9 +136,9 @@ TEST(CacheTest, GPU_CACHE_TEST) { const int dim = 256; for(int i = 0; i < 20; i++) { - std::shared_ptr raw_index(faiss::index_factory(dim, "IDMap,Flat")); - engine::Index_ptr index = std::make_shared(raw_index); - index->ntotal = 1000; + MockVecIndex* mock_index = new MockVecIndex(); + mock_index->ntotal_ = 1000; + engine::Index_ptr index(mock_index); cache::DataObjPtr obj = std::make_shared(index); @@ -117,9 +171,9 @@ TEST(CacheTest, INVALID_TEST) { { LessItemCacheMgr mgr; for(int i = 0; i < 20; i++) { - std::shared_ptr raw_index(faiss::index_factory(2, "IDMap,Flat")); - engine::Index_ptr index = std::make_shared(raw_index); - index->ntotal = 2; + MockVecIndex* mock_index = new MockVecIndex(); + mock_index->ntotal_ = 2; + engine::Index_ptr index(mock_index); cache::DataObjPtr obj = std::make_shared(index); mgr.InsertItem("index_" + std::to_string(i), obj);