update java case

Signed-off-by: zw <zw@milvus.io>
pull/2284/head
zw 2020-05-11 09:48:09 +08:00
commit 1beaef5346
119 changed files with 2811 additions and 1531 deletions

View File

@ -2,3 +2,5 @@ ignored:
- DL3003
- DL3007
- DL3008
# disable following sourced files
- SC1091

View File

@ -7,17 +7,26 @@ Please mark all change in change log and use the issue from GitHub
## Bug
- \#1705 Limit the insert data batch size
- \#1776 Error out when index SQ8H run in CPU mode
- \#1925 To flush all collections, flush cannot work
- \#1929 Skip MySQL meta schema field width check
- \#1946 Fix load index file CPU2GPU fail during searching
- \#1955 Switch create_index operation to background once client break connection
- \#1997 Index file missed after compact
- \#2002 Remove log error msg `Attributes is null`
- \#2073 Fix CheckDBConfigBackendUrl error message
- \#2076 CheckMetricConfigAddress error message
- \#2120 Fix Search expected failed if search params set invalid
- \#2121 Allow regex match partition tag when search
- \#2128 Check has_partition params
- \#2131 Distance/ID returned is not correct if searching with duplicate ids
- \#2141 Fix server start failed if wal directory exist
- \#2169 Fix SingleIndexTest.IVFSQHybrid unittest
- \#2194 Fix get collection info failed
- \#2196 Fix server start failed if wal is disabled
- \#2203 0.8.0 id=-1 is returned when total count < topk
- \#2228 Fix show partitions failed in http module
- \#2231 Use server_config to define hard-delete delay time for segment files
- \#2261 Re-define result returned by has_collection if collection in delete state
## Feature
- \#1751 Add api SearchByID
@ -30,6 +39,8 @@ Please mark all change in change log and use the issue from GitHub
- \#2064 Warn when use SQLite as metadata management
- \#2111 Check GPU environment before start server
- \#2206 Log file rotating
- \#2240 Obtain running rpc requests information
- \#2268 Intelligently detect openblas library in system to avoid installing from source code every time
## Improvement
- \#221 Refactor LOG macro
@ -44,6 +55,9 @@ Please mark all change in change log and use the issue from GitHub
- \#2185 Change id to string format in http module
- \#2186 Update endpoints in http module
- \#2190 Fix memory usage is twice of index size when using GPU searching
- \#2248 Use hostname and port as instance label of metrics
- \#2252 Upgrade mishards APIs and requirements
- \#2256 k-means clustering algorithm use only Euclidean distance metric
## Task

View File

@ -7,17 +7,25 @@ SHELL ["/bin/bash", "-o", "pipefail", "-c"]
RUN yum install -y epel-release centos-release-scl-rh && yum install -y wget curl which && \
wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \
yum install -y ccache make automake git python3-pip libcurl-devel python3-devel boost-static mysql-devel \
yum install -y make automake git python3-pip libcurl-devel python3-devel boost-static mysql-devel \
devtoolset-7-gcc devtoolset-7-gcc-c++ devtoolset-7-gcc-gfortran llvm-toolset-7.0-clang llvm-toolset-7.0-clang-tools-extra \
mysql lcov openblas-devel lapack-devel \
&& \
rm -rf /var/cache/yum/*
RUN echo "source scl_source enable devtoolset-7" >> /etc/profile.d/devtoolset-7.sh
RUN echo "source scl_source enable llvm-toolset-7.0" >> /etc/profile.d/llvm-toolset-7.sh
mysql lcov && \
rm -rf /var/cache/yum/* && \
echo "source scl_source enable devtoolset-7" >> /etc/profile.d/devtoolset-7.sh && \
echo "source scl_source enable llvm-toolset-7.0" >> /etc/profile.d/llvm-toolset-7.sh
ENV CLANG_TOOLS_PATH="/opt/rh/llvm-toolset-7.0/root/usr/bin"
RUN source /etc/profile.d/devtoolset-7.sh && \
wget https://github.com/xianyi/OpenBLAS/archive/v0.3.9.tar.gz && \
tar zxvf v0.3.9.tar.gz && cd OpenBLAS-0.3.9 && \
make TARGET=CORE2 DYNAMIC_ARCH=1 DYNAMIC_OLDER=1 USE_THREAD=0 USE_OPENMP=0 FC=gfortran CC=gcc COMMON_OPT="-O3 -g -fPIC" FCOMMON_OPT="-O3 -g -fPIC -frecursive" NMAX="NUM_THREADS=128" LIBPREFIX="libopenblas" LAPACKE="NO_LAPACKE=1" INTERFACE64=0 NO_STATIC=1 && \
make PREFIX=/usr install && \
cd .. && rm -rf OpenBLAS-0.3.9 && rm v0.3.9.tar.gz
RUN yum install -y ccache && \
rm -rf /var/cache/yum/*
# use login shell to activate environment un the RUN commands
SHELL [ "/bin/bash", "-c", "-l" ]

View File

@ -13,11 +13,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget ca-certifi
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \
wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \
apt-get update && apt-get install -y --no-install-recommends \
g++ git gfortran lsb-core ccache \
g++ git gfortran lsb-core \
libboost-serialization-dev libboost-filesystem-dev libboost-system-dev libboost-regex-dev \
curl libtool automake libssl-dev pkg-config libcurl4-openssl-dev python3-pip \
clang-format-6.0 clang-tidy-6.0 \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 libopenblas-dev liblapack3 && \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 && \
apt-get remove --purge -y && \
rm -rf /var/lib/apt/lists/*
@ -26,6 +26,16 @@ RUN ln -s /usr/lib/x86_64-linux-gnu/libmysqlclient.so \
RUN sh -c 'echo export LD_LIBRARY_PATH=/opt/intel/compilers_and_libraries_2019.5.281/linux/mkl/lib/intel64:\$LD_LIBRARY_PATH > /etc/profile.d/mkl.sh'
RUN wget https://github.com/xianyi/OpenBLAS/archive/v0.3.9.tar.gz && \
tar zxvf v0.3.9.tar.gz && cd OpenBLAS-0.3.9 && \
make TARGET=CORE2 DYNAMIC_ARCH=1 DYNAMIC_OLDER=1 USE_THREAD=0 USE_OPENMP=0 FC=gfortran CC=gcc COMMON_OPT="-O3 -g -fPIC" FCOMMON_OPT="-O3 -g -fPIC -frecursive" NMAX="NUM_THREADS=128" LIBPREFIX="libopenblas" LAPACKE="NO_LAPACKE=1" INTERFACE64=0 NO_STATIC=1 && \
make PREFIX=/usr install && \
cd .. && rm -rf OpenBLAS-0.3.9 && rm v0.3.9.tar.gz
RUN apt-get update && apt-get install -y --no-install-recommends ccache && \
apt-get remove --purge -y && \
rm -rf /var/lib/apt/lists/*
# use login shell to activate environment un the RUN commands
SHELL [ "/bin/bash", "-c", "-l" ]

View File

@ -8,7 +8,7 @@ metadata:
spec:
containers:
- name: milvus-cpu-build-env
image: registry.zilliz.com/milvus/milvus-cpu-build-env:v0.7.0-centos7
image: registry.zilliz.com/milvus/milvus-cpu-build-env:v0.9.0-centos7
env:
- name: POD_IP
valueFrom:
@ -17,7 +17,7 @@ spec:
- name: OS_NAME
value: "centos7"
- name: BUILD_ENV_IMAGE_ID
value: "225b4d9c26d67b70b476964b4dd6e216de4b464d7a973a8c0c7ed1313c4d81ad"
value: "f2386d84d312e42891c8c70219b12fde014c21fbdbc0e59bede7e7609b1ba58b"
command:
- cat
tty: true

View File

@ -8,7 +8,7 @@ metadata:
spec:
containers:
- name: milvus-cpu-build-env
image: registry.zilliz.com/milvus/milvus-cpu-build-env:v0.7.0-ubuntu18.04
image: registry.zilliz.com/milvus/milvus-cpu-build-env:v0.9.0-ubuntu18.04
env:
- name: POD_IP
valueFrom:
@ -17,7 +17,7 @@ spec:
- name: OS_NAME
value: "ubuntu18.04"
- name: BUILD_ENV_IMAGE_ID
value: "23476391bec80c64f10d44a6370c73c71f011a6b95114b10ff82a60e771e11c7"
value: "4719a06f1b77393fed7a4336058baab74745715a431193d3876e9b51262505bd"
command:
- cat
tty: true

View File

@ -8,7 +8,7 @@ metadata:
spec:
containers:
- name: milvus-gpu-build-env
image: registry.zilliz.com/milvus/milvus-gpu-build-env:v0.7.0-centos7
image: registry.zilliz.com/milvus/milvus-gpu-build-env:v0.9.0-centos7
env:
- name: POD_IP
valueFrom:
@ -17,7 +17,7 @@ spec:
- name: OS_NAME
value: "centos7"
- name: BUILD_ENV_IMAGE_ID
value: "a5ec9914737ea4727d88ae36b4a73ca5d817f19438ba913cc1de6a1ee2ed2336"
value: "7087442c4c5a7a7adbd7324c58b7b1ac19a25acfd86d6017b5752c4c6521f90e"
command:
- cat
tty: true

View File

@ -8,7 +8,7 @@ metadata:
spec:
containers:
- name: milvus-gpu-build-env
image: registry.zilliz.com/milvus/milvus-gpu-build-env:v0.7.0-ubuntu18.04
image: registry.zilliz.com/milvus/milvus-gpu-build-env:v0.9.0-ubuntu18.04
env:
- name: POD_IP
valueFrom:
@ -17,7 +17,7 @@ spec:
- name: OS_NAME
value: "ubuntu18.04"
- name: BUILD_ENV_IMAGE_ID
value: "da9023b0f858f072672f86483a869aa87e90a5140864f89e5a012ec766d96dea"
value: "0aa65ebac377834ceb9644c320f114b97b488d11762948770b994f73e5ae518f"
command:
- cat
tty: true

View File

@ -13,7 +13,7 @@ timeout(time: 180, unit: 'MINUTES') {
helm status -n milvus ${env.SHARDS_HELM_RELEASE_NAME}"
def helmResult = sh script: helmStatusCMD, returnStatus: true
if (!helmResult) {
sh "helm uninstall -n milvus ${env.SHARDS_HELM_RELEASE_NAME} || sleep 1m"
sh "helm uninstall -n milvus ${env.SHARDS_HELM_RELEASE_NAME} && sleep 1m"
}
throw exc
}

View File

@ -13,7 +13,7 @@ timeout(time: 180, unit: 'MINUTES') {
helm status -n milvus ${env.HELM_RELEASE_NAME}"
def helmResult = sh script: helmStatusCMD, returnStatus: true
if (!helmResult) {
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME} || sleep 1m"
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME} && sleep 1m"
}
throw exc
}
@ -43,7 +43,7 @@ timeout(time: 180, unit: 'MINUTES') {
helm status -n milvus ${env.HELM_RELEASE_NAME}"
def helmResult = sh script: helmStatusCMD, returnStatus: true
if (!helmResult) {
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME} || sleep 1m"
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME} && sleep 1m"
}
throw exc
}

View File

@ -13,7 +13,7 @@ timeout(time: 120, unit: 'MINUTES') {
helm status -n milvus ${env.HELM_RELEASE_NAME}"
def helmResult = sh script: helmStatusCMD, returnStatus: true
if (!helmResult) {
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME} || sleep 1m"
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME} && sleep 1m"
}
throw exc
}

View File

@ -123,6 +123,7 @@ CMAKE_CMD="cmake \
-DFAISS_WITH_MKL=${WITH_MKL} \
-DArrow_SOURCE=AUTO \
-DFAISS_SOURCE=AUTO \
-DOpenBLAS_SOURCE=AUTO \
-DMILVUS_WITH_FIU=${FIU_ENABLE} \
${MILVUS_CORE_DIR}"
echo ${CMAKE_CMD}

View File

@ -90,7 +90,7 @@ if (MILVUS_VERSION_MAJOR STREQUAL ""
OR MILVUS_VERSION_MINOR STREQUAL ""
OR MILVUS_VERSION_PATCH STREQUAL "")
message(WARNING "Failed to determine Milvus version from git branch name")
set(MILVUS_VERSION "0.8.0")
set(MILVUS_VERSION "0.9.0")
endif ()
message(STATUS "Build version = ${MILVUS_VERSION}")
@ -217,8 +217,6 @@ else ()
@ONLY)
endif ()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/conf/log_config.template ${CMAKE_CURRENT_SOURCE_DIR}/conf/log_config.conf)
install(DIRECTORY scripts/
DESTINATION scripts
FILE_PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ
@ -232,7 +230,6 @@ install(DIRECTORY scripts/migration
WORLD_EXECUTE WORLD_READ)
install(FILES
conf/server_config.yaml
conf/log_config.conf
DESTINATION
conf)

View File

@ -16,6 +16,7 @@ FAISS_ROOT="" #FAISS root path
FAISS_SOURCE="BUNDLED"
WITH_PROMETHEUS="ON"
FIU_ENABLE="OFF"
BUILD_OPENBLAS="ON"
while getopts "p:d:t:f:ulrcghzmei" arg; do
case $arg in
@ -112,6 +113,7 @@ CMAKE_CMD="cmake \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DFAISS_ROOT=${FAISS_ROOT} \
-DFAISS_SOURCE=${FAISS_SOURCE} \
-DOpenBLAS_SOURCE=AUTO \
-DCMAKE_CUDA_COMPILER=${CUDA_COMPILER} \
-DBUILD_COVERAGE=${BUILD_COVERAGE} \
-DMILVUS_DB_PATH=${DB_PATH} \
@ -119,7 +121,7 @@ CMAKE_CMD="cmake \
-DMILVUS_GPU_VERSION=${GPU_VERSION} \
-DFAISS_WITH_MKL=${WITH_MKL} \
-DMILVUS_WITH_PROMETHEUS=${WITH_PROMETHEUS} \
-DMILVUS_WITH_FIU=${FIU_ENABLE}
-DMILVUS_WITH_FIU=${FIU_ENABLE} \
../"
echo ${CMAKE_CMD}
${CMAKE_CMD}

View File

@ -1,27 +0,0 @@
* GLOBAL:
FORMAT = "%datetime | %level | %logger | %msg"
FILENAME = "/var/lib/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-global.log"
ENABLED = true
TO_FILE = true
TO_STANDARD_OUTPUT = false
SUBSECOND_PRECISION = 3
PERFORMANCE_TRACKING = false
MAX_LOG_FILE_SIZE = 209715200 ## Throw log files away after 200MB
* DEBUG:
FILENAME = "/var/lib/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-debug.log"
ENABLED = true
* WARNING:
FILENAME = "/var/lib/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-warning.log"
* TRACE:
FILENAME = "/var/lib/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-trace.log"
* VERBOSE:
FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg"
TO_FILE = false
TO_STANDARD_OUTPUT = false
## Error logs
* ERROR:
ENABLED = true
FILENAME = "/var/lib/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-error.log"
* FATAL:
ENABLED = true
FILENAME = "/var/lib/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-fatal.log"

View File

@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
version: 0.3
version: 0.4
#----------------------+------------------------------------------------------------+------------+-----------------+
# Server Config | Description | Type | Default |
@ -68,9 +68,13 @@ db_config:
# secondary_path | A semicolon-separated list of secondary directories used | Path | |
# | to save vector data and index data. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# file_cleanup_timeout | time gap between soft-delete and hard-delete | Integer | 10 (s) |
# | range [0, 3600] | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
storage_config:
primary_path: /var/lib/milvus
secondary_path:
file_cleanup_timeout: 10
#----------------------+------------------------------------------------------------+------------+-----------------+
# Metric Config | Description | Type | Default |
@ -186,3 +190,39 @@ wal_config:
recovery_error_ignore: true
buffer_size: 256
wal_path: /var/lib/milvus/wal
#----------------------+------------------------------------------------------------+------------+-----------------+
# Logs | Description | Type | Default |
#----------------------+------------------------------------------------------------+------------+-----------------+
# trace.enable | Whether to enable trace level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# debug.enable | Whether to enable debug level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# info.enable | Whether to enable info level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# warning.enable | Whether to enable warning level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# error.enable | Whether to enable error level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# fatal.enable | Whether to enable fatal level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# path | Location of logs files. | String | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# max_log_file_size | Max size of a single log file. After exceeding this value, | Integer | 256 (MB) |
# | rename this file to xxx.log.n, means the nth file. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# delete_exceeds | Milvus will keep up to ${delete_exceeds} log files per | Integer | 10 |
# | level. For example, after xxx.log.11 file is generated, | | |
# | the xxx.log.1 file will be deleted. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
logs:
trace.enable: true
debug.enable: true
info.enable: true
warning.enable: true
error.enable: true
fatal.enable: true
path: /var/lib/milvus/logs
max_log_file_size: 256
delete_exceeds: 10

View File

@ -1,30 +0,0 @@
* GLOBAL:
FORMAT = "[%datetime][%level]%msg"
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%y-%M-%d-%H:%m}-global.log"
ENABLED = true
TO_FILE = true
TO_STANDARD_OUTPUT = false
SUBSECOND_PRECISION = 3
PERFORMANCE_TRACKING = false
MAX_LOG_FILE_SIZE = 209715200 ## Throw log files away after 200MB
* INFO:
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%y-%M-%d-%H:%m}-info.log"
ENABLED = true
* DEBUG:
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%y-%M-%d-%H:%m}-debug.log"
ENABLED = true
* WARNING:
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%y-%M-%d-%H:%m}-warning.log"
* TRACE:
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%y-%M-%d-%H:%m}-trace.log"
* VERBOSE:
FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg"
TO_FILE = false
TO_STANDARD_OUTPUT = false
## Error logs
* ERROR:
ENABLED = true
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%y-%M-%d-%H:%m}-error.log"
* FATAL:
ENABLED = true
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%y-%M-%d-%H:%m}-fatal.log"

View File

@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
version: 0.3
version: 0.4
#----------------------+------------------------------------------------------------+------------+-----------------+
# Server Config | Description | Type | Default |
@ -68,9 +68,13 @@ db_config:
# secondary_path | A semicolon-separated list of secondary directories used | Path | |
# | to save vector data and index data. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# file_cleanup_timeout | time gap between soft-delete and hard-delete | Integer | 10 (s) |
# | range [0, 3600] | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
storage_config:
primary_path: @MILVUS_DB_PATH@
secondary_path:
file_cleanup_timeout: 10
#----------------------+------------------------------------------------------------+------------+-----------------+
# Metric Config | Description | Type | Default |
@ -187,6 +191,30 @@ wal_config:
buffer_size: 256
wal_path: @MILVUS_DB_PATH@/wal
#----------------------+------------------------------------------------------------+------------+-----------------+
# Logs | Description | Type | Default |
#----------------------+------------------------------------------------------------+------------+-----------------+
# trace.enable | Whether to enable trace level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# debug.enable | Whether to enable debug level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# info.enable | Whether to enable info level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# warning.enable | Whether to enable warning level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# error.enable | Whether to enable error level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# fatal.enable | Whether to enable fatal level logging in Milvus. | Boolean | true |
#----------------------+------------------------------------------------------------+------------+-----------------+
# path | Location of logs files. | String | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# max_log_file_size | Max size of a single log file. After exceeding this value, | Integer | 256 (MB) |
# | rename this file to xxx.log.n, means the nth file. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# delete_exceeds | Milvus will keep up to ${delete_exceeds} log files per | Integer | 10 |
# | level. For example, after xxx.log.11 file is generated, | | |
# | the xxx.log.1 file will be deleted. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
logs:
trace.enable: true
debug.enable: true

View File

@ -37,6 +37,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/merge db_merge_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/wal db_wal_files)
set(grpc_service_files
@ -143,6 +144,7 @@ set(engine_files
${db_engine_files}
${db_insert_files}
${db_meta_files}
${db_merge_files}
${db_wal_files}
${metrics_files}
${storage_files}
@ -331,18 +333,10 @@ install(FILES
${CMAKE_BINARY_DIR}/fiu_ep-prefix/src/fiu_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}fiu${CMAKE_SHARED_LIBRARY_SUFFIX}.1.00
DESTINATION lib)
if (CMAKE_BUILD_TYPE STREQUAL "Release")
if(EXISTS ${CMAKE_BINARY_DIR}/src/index/openblas_ep-prefix/src/openblas_ep/lib/)
install(FILES
${CMAKE_BINARY_DIR}/src/index/openblas_ep-prefix/src/openblas_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas${CMAKE_SHARED_LIBRARY_SUFFIX}
${CMAKE_BINARY_DIR}/src/index/openblas_ep-prefix/src/openblas_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas${CMAKE_SHARED_LIBRARY_SUFFIX}.0
${CMAKE_BINARY_DIR}/src/index/openblas_ep-prefix/src/openblas_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas${CMAKE_SHARED_LIBRARY_SUFFIX}.0.3
DESTINATION lib)
elseif(CMAKE_BUILD_TYPE STREQUAL "Debug")
install(FILES
${CMAKE_BINARY_DIR}/src/index/openblas_ep-prefix/src/openblas_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas_d${CMAKE_SHARED_LIBRARY_SUFFIX}
${CMAKE_BINARY_DIR}/src/index/openblas_ep-prefix/src/openblas_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas_d${CMAKE_SHARED_LIBRARY_SUFFIX}.0
${CMAKE_BINARY_DIR}/src/index/openblas_ep-prefix/src/openblas_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas_d${CMAKE_SHARED_LIBRARY_SUFFIX}.0.3
DESTINATION lib)
else()
message("unknown CMAKE_BUILD_TYPE")
endif()

View File

@ -121,8 +121,8 @@ DefaultAttrsFormat::write(const milvus::storage::FSHandlerPtr& fs_ptr, const mil
auto it = attrs_ptr->attrs.begin();
if (it == attrs_ptr->attrs.end()) {
std::string err_msg = "Attributes is null";
LOG_ENGINE_ERROR_ << err_msg;
// std::string err_msg = "Attributes is null";
// LOG_ENGINE_ERROR_ << err_msg;
return;
}

File diff suppressed because it is too large Load Diff

View File

@ -25,14 +25,6 @@ namespace server {
using ConfigCallBackF = std::function<Status(const std::string&)>;
#define CONFIG_CHECK(func) \
do { \
Status s = func; \
if (!s.ok()) { \
return s; \
} \
} while (false)
extern const char* CONFIG_NODE_DELIMITER;
extern const char* CONFIG_VERSION;
@ -70,18 +62,19 @@ extern const char* CONFIG_STORAGE_PRIMARY_PATH;
extern const char* CONFIG_STORAGE_PRIMARY_PATH_DEFAULT;
extern const char* CONFIG_STORAGE_SECONDARY_PATH;
extern const char* CONFIG_STORAGE_SECONDARY_PATH_DEFAULT;
extern const char* CONFIG_STORAGE_S3_ENABLE;
extern const char* CONFIG_STORAGE_S3_ENABLE_DEFAULT;
extern const char* CONFIG_STORAGE_S3_ADDRESS;
extern const char* CONFIG_STORAGE_S3_ADDRESS_DEFAULT;
extern const char* CONFIG_STORAGE_S3_PORT;
extern const char* CONFIG_STORAGE_S3_PORT_DEFAULT;
extern const char* CONFIG_STORAGE_S3_ACCESS_KEY;
extern const char* CONFIG_STORAGE_S3_ACCESS_KEY_DEFAULT;
extern const char* CONFIG_STORAGE_S3_SECRET_KEY;
extern const char* CONFIG_STORAGE_S3_SECRET_KEY_DEFAULT;
extern const char* CONFIG_STORAGE_S3_BUCKET;
extern const char* CONFIG_STORAGE_S3_BUCKET_DEFAULT;
extern const char* CONFIG_STORAGE_FILE_CLEANUP_TIMEOUT;
// extern const char* CONFIG_STORAGE_S3_ENABLE;
// extern const char* CONFIG_STORAGE_S3_ENABLE_DEFAULT;
// extern const char* CONFIG_STORAGE_S3_ADDRESS;
// extern const char* CONFIG_STORAGE_S3_ADDRESS_DEFAULT;
// extern const char* CONFIG_STORAGE_S3_PORT;
// extern const char* CONFIG_STORAGE_S3_PORT_DEFAULT;
// extern const char* CONFIG_STORAGE_S3_ACCESS_KEY;
// extern const char* CONFIG_STORAGE_S3_ACCESS_KEY_DEFAULT;
// extern const char* CONFIG_STORAGE_S3_SECRET_KEY;
// extern const char* CONFIG_STORAGE_S3_SECRET_KEY_DEFAULT;
// extern const char* CONFIG_STORAGE_S3_BUCKET;
// extern const char* CONFIG_STORAGE_S3_BUCKET_DEFAULT;
/* cache config */
extern const char* CONFIG_CACHE;
@ -117,11 +110,7 @@ extern const char* CONFIG_ENGINE_GPU_SEARCH_THRESHOLD_DEFAULT;
/* gpu resource config */
extern const char* CONFIG_GPU_RESOURCE;
extern const char* CONFIG_GPU_RESOURCE_ENABLE;
#ifdef MILVUS_GPU_VERSION
extern const char* CONFIG_GPU_RESOURCE_ENABLE_DEFAULT;
#else
extern const char* CONFIG_GPU_RESOURCE_ENABLE_DEFAULT;
#endif
extern const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY;
extern const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY_DEFAULT;
extern const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD;
@ -256,17 +245,19 @@ class Config {
Status
CheckStorageConfigSecondaryPath(const std::string& value);
Status
CheckStorageConfigS3Enable(const std::string& value);
Status
CheckStorageConfigS3Address(const std::string& value);
Status
CheckStorageConfigS3Port(const std::string& value);
Status
CheckStorageConfigS3AccessKey(const std::string& value);
Status
CheckStorageConfigS3SecretKey(const std::string& value);
Status
CheckStorageConfigS3Bucket(const std::string& value);
CheckStorageConfigFileCleanupTimeout(const std::string& value);
// Status
// CheckStorageConfigS3Enable(const std::string& value);
// Status
// CheckStorageConfigS3Address(const std::string& value);
// Status
// CheckStorageConfigS3Port(const std::string& value);
// Status
// CheckStorageConfigS3AccessKey(const std::string& value);
// Status
// CheckStorageConfigS3SecretKey(const std::string& value);
// Status
// CheckStorageConfigS3Bucket(const std::string& value);
/* metric config */
Status
@ -389,17 +380,19 @@ class Config {
Status
GetStorageConfigSecondaryPath(std::string& value);
Status
GetStorageConfigS3Enable(bool& value);
Status
GetStorageConfigS3Address(std::string& value);
Status
GetStorageConfigS3Port(std::string& value);
Status
GetStorageConfigS3AccessKey(std::string& value);
Status
GetStorageConfigS3SecretKey(std::string& value);
Status
GetStorageConfigS3Bucket(std::string& value);
GetStorageConfigFileCleanupTimeup(int64_t& value);
// Status
// GetStorageConfigS3Enable(bool& value);
// Status
// GetStorageConfigS3Address(std::string& value);
// Status
// GetStorageConfigS3Port(std::string& value);
// Status
// GetStorageConfigS3AccessKey(std::string& value);
// Status
// GetStorageConfigS3SecretKey(std::string& value);
// Status
// GetStorageConfigS3Bucket(std::string& value);
/* metric config */
Status
@ -514,17 +507,19 @@ class Config {
Status
SetStorageConfigSecondaryPath(const std::string& value);
Status
SetStorageConfigS3Enable(const std::string& value);
Status
SetStorageConfigS3Address(const std::string& value);
Status
SetStorageConfigS3Port(const std::string& value);
Status
SetStorageConfigS3AccessKey(const std::string& value);
Status
SetStorageConfigS3SecretKey(const std::string& value);
Status
SetStorageConfigS3Bucket(const std::string& value);
SetStorageConfigFileCleanupTimeout(const std::string& value);
// Status
// SetStorageConfigS3Enable(const std::string& value);
// Status
// SetStorageConfigS3Address(const std::string& value);
// Status
// SetStorageConfigS3Port(const std::string& value);
// Status
// SetStorageConfigS3AccessKey(const std::string& value);
// Status
// SetStorageConfigS3SecretKey(const std::string& value);
// Status
// SetStorageConfigS3Bucket(const std::string& value);
/* metric config */
Status
@ -551,6 +546,22 @@ class Config {
SetEngineConfigOmpThreadNum(const std::string& value);
Status
SetEngineConfigSimdType(const std::string& value);
#ifdef MILVUS_GPU_VERSION
Status
SetEngineConfigGpuSearchThreshold(const std::string& value);
/* gpu resource config */
Status
SetGpuResourceConfigEnable(const std::string& value);
Status
SetGpuResourceConfigCacheCapacity(const std::string& value);
Status
SetGpuResourceConfigCacheThreshold(const std::string& value);
Status
SetGpuResourceConfigSearchResources(const std::string& value);
Status
SetGpuResourceConfigBuildIndexResources(const std::string& value);
#endif
/* tracing config */
Status
@ -586,23 +597,6 @@ class Config {
Status
SetLogsDeleteExceeds(const std::string& value);
#ifdef MILVUS_GPU_VERSION
Status
SetEngineConfigGpuSearchThreshold(const std::string& value);
/* gpu resource config */
Status
SetGpuResourceConfigEnable(const std::string& value);
Status
SetGpuResourceConfigCacheCapacity(const std::string& value);
Status
SetGpuResourceConfigCacheThreshold(const std::string& value);
Status
SetGpuResourceConfigSearchResources(const std::string& value);
Status
SetGpuResourceConfigBuildIndexResources(const std::string& value);
#endif
private:
bool restart_required_ = false;
std::string config_file_;

View File

@ -56,10 +56,10 @@ class DB {
DescribeCollection(meta::CollectionSchema& table_schema_) = 0;
virtual Status
HasCollection(const std::string& collection_id, bool& has_or_not_) = 0;
HasCollection(const std::string& collection_id, bool& has_or_not) = 0;
virtual Status
HasNativeCollection(const std::string& collection_id, bool& has_or_not_) = 0;
HasNativeCollection(const std::string& collection_id, bool& has_or_not) = 0;
virtual Status
AllCollections(std::vector<meta::CollectionSchema>& table_schema_array) = 0;

View File

@ -31,6 +31,7 @@
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "db/IDGenerator.h"
#include "db/merge/MergeManagerFactory.h"
#include "engine/EngineFactory.h"
#include "index/thirdparty/faiss/utils/distances.h"
#include "insert/MemManagerFactory.h"
@ -78,6 +79,7 @@ DBImpl::DBImpl(const DBOptions& options)
: options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
merge_mgr_ptr_ = MergeManagerFactory::Build(meta_ptr_, options_);
if (options_.wal_enable_) {
wal::MXLogConfiguration mxlog_config;
@ -275,30 +277,16 @@ DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
return SHUTDOWN_ERROR;
}
return meta_ptr_->HasCollection(collection_id, has_or_not);
return meta_ptr_->HasCollection(collection_id, has_or_not, false);
}
Status
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not_) {
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
has_or_not_ = false;
return status;
} else {
if (!collection_schema.owner_collection_.empty()) {
has_or_not_ = false;
return Status(DB_NOT_FOUND, "");
}
has_or_not_ = true;
return Status::OK();
}
return meta_ptr_->HasCollection(collection_id, has_or_not, true);
}
Status
@ -1920,101 +1908,6 @@ DBImpl::StartMergeTask() {
// LOG_ENGINE_DEBUG_ << "End StartMergeTask";
}
Status
DBImpl::MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
// step 1: create collection file
meta::SegmentSchema collection_file;
collection_file.collection_id_ = collection_id;
collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
Status status = meta_ptr_->CreateCollectionFile(collection_file);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
return status;
}
// step 2: merge files
/*
ExecutionEnginePtr index =
EngineFactory::Build(collection_file.dimension_, collection_file.location_,
(EngineType)collection_file.engine_type_, (MetricType)collection_file.metric_type_, collection_file.nlist_);
*/
meta::SegmentsSchema updated;
std::string new_segment_dir;
utils::GetParentPath(collection_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
files_holder.UnmarkFile(file);
auto file_schema = file;
file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
updated.push_back(file_schema);
auto size = segment_writer_ptr->Size();
if (size >= file_schema.index_file_size_) {
break;
}
}
// step 3: serialize to disk
try {
status = segment_writer_ptr->Serialize();
fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
} catch (std::exception& ex) {
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << msg;
status = Status(DB_ERROR, msg);
}
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();
// if failed to serialize merge file to disk
// typical error: out of disk space, out of memory or permission denied
collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
status = meta_ptr_->UpdateCollectionFile(collection_file);
LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_
<< " to to_delete";
return status;
}
// step 4: update collection files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
// else set file type to RAW, no need to build index
if (!utils::IsRawIndexType(collection_file.engine_type_)) {
collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
? meta::SegmentSchema::TO_INDEX
: meta::SegmentSchema::RAW;
} else {
collection_file.file_type_ = meta::SegmentSchema::RAW;
}
collection_file.file_size_ = segment_writer_ptr->Size();
collection_file.row_count_ = segment_writer_ptr->VectorCount();
updated.push_back(collection_file);
status = meta_ptr_->UpdateCollectionFiles(updated);
LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size "
<< segment_writer_ptr->Size() << " bytes";
if (options_.insert_cache_immediately_) {
segment_writer_ptr->Cache();
}
return status;
}
Status
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
@ -2110,44 +2003,22 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& fi
return status;
}
Status
DBImpl::BackgroundMergeFiles(const std::string& collection_id) {
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
meta::FilesHolder files_holder;
auto status = meta_ptr_->FilesToMerge(collection_id, files_holder);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id;
return status;
}
if (files_holder.HoldFiles().size() < options_.merge_trigger_number_) {
LOG_ENGINE_TRACE_ << "Files number not greater equal than merge trigger number, skip merge action";
return Status::OK();
}
MergeFiles(collection_id, files_holder);
if (!initialized_.load(std::memory_order_acquire)) {
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
}
return Status::OK();
}
void
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
// LOG_ENGINE_TRACE_ << " Background merge thread start";
Status status;
for (auto& collection_id : collection_ids) {
status = BackgroundMergeFiles(collection_id);
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
auto status = merge_mgr_ptr_->MergeFiles(collection_id);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Merge files for collection " << collection_id << " failed: " << status.ToString();
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id
<< " reason:" << status.message();
}
if (!initialized_.load(std::memory_order_acquire)) {
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action";
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
break;
}
}
@ -2155,11 +2026,8 @@ DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
meta_ptr_->Archive();
{
uint64_t ttl = 10 * meta::SECOND; // default: file will be hard-deleted few seconds after soft-deleted
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
ttl = meta::HOUR;
}
uint64_t timeout = (options_.file_cleanup_timeout_ > 0) ? options_.file_cleanup_timeout_ : 10;
uint64_t ttl = timeout * meta::SECOND; // default: file will be hard-deleted few seconds after soft-deleted
meta_ptr_->CleanUpFilesWithTTL(ttl);
}

View File

@ -29,6 +29,7 @@
#include "db/IndexFailedChecker.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "db/merge/MergeManager.h"
#include "db/meta/FilesHolder.h"
#include "utils/ThreadPool.h"
#include "wal/WalManager.h"
@ -226,12 +227,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
void
StartMergeTask();
Status
MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder);
Status
BackgroundMergeFiles(const std::string& collection_id);
void
BackgroundMerge(std::set<std::string> collection_ids);
@ -290,6 +285,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
meta::MetaPtr meta_ptr_;
MemManagerPtr mem_mgr_;
MergeManagerPtr merge_mgr_ptr_;
std::shared_ptr<wal::WalManager> wal_mgr_;
std::thread bg_wal_thread_;

View File

@ -72,6 +72,7 @@ struct DBOptions {
bool insert_cache_immediately_ = false;
int64_t auto_flush_interval_ = 1;
int64_t file_cleanup_timeout_ = 10;
// wal relative configurations
bool wal_enable_ = true;

View File

@ -158,15 +158,15 @@ GetCollectionFilePath(const DBMetaOptions& options, meta::SegmentSchema& table_f
std::string parent_path = ConstructParentFolder(options.path_, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_;
bool s3_enable = false;
server::Config& config = server::Config::GetInstance();
config.GetStorageConfigS3Enable(s3_enable);
fiu_do_on("GetCollectionFilePath.enable_s3", s3_enable = true);
if (s3_enable) {
/* need not check file existence */
table_file.location_ = file_path;
return Status::OK();
}
// bool s3_enable = false;
// server::Config& config = server::Config::GetInstance();
// config.GetStorageConfigS3Enable(s3_enable);
// fiu_do_on("GetCollectionFilePath.enable_s3", s3_enable = true);
// if (s3_enable) {
// /* need not check file existence */
// table_file.location_ = file_path;
// return Status::OK();
// }
if (boost::filesystem::exists(parent_path)) {
table_file.location_ = file_path;

View File

@ -465,7 +465,7 @@ ExecutionEngineImpl::Load(bool to_cache) {
bool gpu_enable = false;
#ifdef MILVUS_GPU_VERSION
server::Config& config = server::Config::GetInstance();
CONFIG_CHECK(config.GetGpuResourceConfigEnable(gpu_enable));
STATUS_CHECK(config.GetGpuResourceConfigEnable(gpu_enable));
#endif
if (!gpu_enable && index_->index_mode() == knowhere::IndexMode::MODE_GPU) {
std::string err_msg = "Index with type " + index_->index_type() + " must be used in GPU mode";

View File

@ -0,0 +1,138 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeLayeredStrategy.h"
#include "db/Utils.h"
#include "db/meta/MetaConsts.h"
#include "utils/Log.h"
#include <map>
#include <vector>
namespace milvus {
namespace engine {
Status
MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
using LayerGroups = std::map<uint64_t, meta::SegmentsSchema>;
// distribute files to groups according to file size(in byte)
LayerGroups layers = {
{1UL << 22, meta::SegmentsSchema()}, // 4MB
{1UL << 24, meta::SegmentsSchema()}, // 16MB
{1UL << 26, meta::SegmentsSchema()}, // 64MB
{1UL << 28, meta::SegmentsSchema()}, // 256MB
{1UL << 30, meta::SegmentsSchema()}, // 1GB
};
meta::SegmentsSchema& files = files_holder.HoldFiles();
meta::SegmentsSchema huge_files;
// iterater from end, because typically the files_holder get files in order from largest to smallest
for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) {
meta::SegmentSchema& file = *iter;
if (file.index_file_size_ > 0 && file.file_size_ > file.index_file_size_) {
// release file that no need to merge
files_holder.UnmarkFile(file);
continue;
}
bool match = false;
for (auto& pair : layers) {
if ((*iter).file_size_ < pair.first) {
pair.second.push_back(file);
match = true;
break;
}
}
if (!match) {
huge_files.push_back(file);
}
}
const int64_t force_merge_threashold = 60; // force merge files older than 1 minute
auto now = utils::GetMicroSecTimeStamp();
meta::SegmentsSchema force_merge_file;
for (auto& pair : layers) {
// skip empty layer
if (pair.second.empty()) {
continue;
}
// layer has multiple files, merge along with the force_merge_file
if (!force_merge_file.empty()) {
for (auto& file : force_merge_file) {
pair.second.push_back(file);
}
force_merge_file.clear();
}
// layer only has one file, if the file is too old, force merge it, else no need to merge it
if (pair.second.size() == 1) {
if (now - pair.second[0].created_on_ > force_merge_threashold * meta::US_PS) {
force_merge_file.push_back(pair.second[0]);
pair.second.clear();
}
}
}
// if force_merge_file is not allocated by any layer, combine it to huge_files
if (!force_merge_file.empty() && !huge_files.empty()) {
for (auto& file : force_merge_file) {
huge_files.push_back(file);
}
force_merge_file.clear();
}
// return result
for (auto& pair : layers) {
if (pair.second.size() == 1) {
// release file that no need to merge
files_holder.UnmarkFile(pair.second[0]);
} else if (pair.second.size() > 1) {
// create group
meta::SegmentsSchema temp_files;
temp_files.swap(pair.second);
files_groups.emplace_back(temp_files);
}
}
if (huge_files.size() >= 1) {
meta::SegmentsSchema temp_files;
temp_files.swap(huge_files);
for (auto& file : force_merge_file) {
temp_files.push_back(file);
}
if (temp_files.size() >= 2) {
// create group
files_groups.emplace_back(temp_files);
} else {
for (auto& file : huge_files) {
// release file that no need to merge
files_holder.UnmarkFile(file);
}
for (auto& file : force_merge_file) {
// release file that no need to merge
files_holder.UnmarkFile(file);
}
}
} else {
for (auto& file : force_merge_file) {
// release file that no need to merge
files_holder.UnmarkFile(file);
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <vector>
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeLayeredStrategy : public MergeStrategy {
public:
Status
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override;
}; // MergeLayeredStrategy
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
enum class MergeStrategyType {
SIMPLE = 1,
LAYERED = 2,
};
class MergeManager {
public:
virtual Status
UseStrategy(MergeStrategyType type) = 0;
virtual Status
MergeFiles(const std::string& collection_id) = 0;
}; // MergeManager
using MergeManagerPtr = std::shared_ptr<MergeManager>;
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,26 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeManagerFactory.h"
#include "db/merge/MergeManagerImpl.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
MergeManagerPtr
MergeManagerFactory::Build(const meta::MetaPtr& meta_ptr, const DBOptions& options) {
return std::make_shared<MergeManagerImpl>(meta_ptr, options, MergeStrategyType::SIMPLE);
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "MergeManager.h"
#include "db/Options.h"
#include <memory>
namespace milvus {
namespace engine {
class MergeManagerFactory {
public:
static MergeManagerPtr
Build(const meta::MetaPtr& meta_ptr, const DBOptions& options);
};
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,89 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeManagerImpl.h"
#include "db/merge/MergeLayeredStrategy.h"
#include "db/merge/MergeSimpleStrategy.h"
#include "db/merge/MergeStrategy.h"
#include "db/merge/MergeTask.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
MergeManagerImpl::MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type)
: meta_ptr_(meta_ptr), options_(options) {
UseStrategy(type);
}
Status
MergeManagerImpl::UseStrategy(MergeStrategyType type) {
switch (type) {
case MergeStrategyType::SIMPLE: {
strategy_ = std::make_shared<MergeSimpleStrategy>();
break;
}
case MergeStrategyType::LAYERED: {
strategy_ = std::make_shared<MergeLayeredStrategy>();
break;
}
default: {
std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type);
LOG_ENGINE_ERROR_ << msg;
throw Exception(DB_ERROR, msg);
}
}
return Status::OK();
}
Status
MergeManagerImpl::MergeFiles(const std::string& collection_id) {
if (strategy_ == nullptr) {
std::string msg = "No merge strategy specified";
LOG_ENGINE_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
meta::FilesHolder files_holder;
auto status = meta_ptr_->FilesToMerge(collection_id, files_holder);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id;
return status;
}
if (files_holder.HoldFiles().size() < 2) {
return Status::OK();
}
MergeFilesGroups files_groups;
status = strategy_->RegroupFiles(files_holder, files_groups);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to regroup files for: " << collection_id
<< ", continue to merge all files into one";
MergeTask task(meta_ptr_, options_, files_holder.HoldFiles());
return task.Execute();
}
for (auto& group : files_groups) {
MergeTask task(meta_ptr_, options_, files_holder.HoldFiles());
status = task.Execute();
files_holder.UnmarkFiles(group);
}
return status;
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,48 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <ctime>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/merge/MergeManager.h"
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeManagerImpl : public MergeManager {
public:
MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type);
Status
UseStrategy(MergeStrategyType type) override;
Status
MergeFiles(const std::string& collection_id) override;
private:
meta::MetaPtr meta_ptr_;
DBOptions options_;
MergeStrategyPtr strategy_;
}; // MergeManagerImpl
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,25 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeSimpleStrategy.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
Status
MergeSimpleStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
files_groups.push_back(files_holder.HoldFiles());
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <vector>
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeSimpleStrategy : public MergeStrategy {
public:
Status
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override;
}; // MergeSimpleStrategy
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,38 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
using MergeFilesGroups = std::vector<meta::SegmentsSchema>;
class MergeStrategy {
public:
virtual Status
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) = 0;
}; // MergeStrategy
using MergeStrategyPtr = std::shared_ptr<MergeStrategy>;
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,128 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeTask.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Log.h"
#include <memory>
#include <string>
namespace milvus {
namespace engine {
MergeTask::MergeTask(const meta::MetaPtr& meta_ptr, const DBOptions& options, meta::SegmentsSchema& files)
: meta_ptr_(meta_ptr), options_(options), files_(files) {
}
Status
MergeTask::Execute() {
if (files_.empty()) {
return Status::OK();
}
// check input
std::string collection_id = files_.front().collection_id_;
for (auto& file : files_) {
if (file.collection_id_ != collection_id) {
return Status(DB_ERROR, "Cannot merge files across collections");
}
}
// step 1: create collection file
meta::SegmentSchema collection_file;
collection_file.collection_id_ = collection_id;
collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
Status status = meta_ptr_->CreateCollectionFile(collection_file);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
return status;
}
// step 2: merge files
meta::SegmentsSchema updated;
std::string new_segment_dir;
utils::GetParentPath(collection_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
std::string info = "Merge task files size info:";
for (auto& file : files_) {
info += std::to_string(file.file_size_);
info += ", ";
server::CollectMergeFilesMetrics metrics;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
auto file_schema = file;
file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
updated.push_back(file_schema);
auto size = segment_writer_ptr->Size();
if (size >= file_schema.index_file_size_) {
break;
}
}
LOG_ENGINE_DEBUG_ << info;
// step 3: serialize to disk
try {
status = segment_writer_ptr->Serialize();
} catch (std::exception& ex) {
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << msg;
status = Status(DB_ERROR, msg);
}
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();
// if failed to serialize merge file to disk
// typical error: out of disk space, out of memory or permission denied
collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
status = meta_ptr_->UpdateCollectionFile(collection_file);
LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_
<< " to to_delete";
return status;
}
// step 4: update collection files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
// else set file type to RAW, no need to build index
if (!utils::IsRawIndexType(collection_file.engine_type_)) {
collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
? meta::SegmentSchema::TO_INDEX
: meta::SegmentSchema::RAW;
} else {
collection_file.file_type_ = meta::SegmentSchema::RAW;
}
collection_file.file_size_ = segment_writer_ptr->Size();
collection_file.row_count_ = segment_writer_ptr->VectorCount();
updated.push_back(collection_file);
status = meta_ptr_->UpdateCollectionFiles(updated);
LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size "
<< segment_writer_ptr->Size() << " bytes";
if (options_.insert_cache_immediately_) {
segment_writer_ptr->Cache();
}
return status;
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,36 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/merge/MergeManager.h"
#include "db/meta/MetaTypes.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeTask {
public:
MergeTask(const meta::MetaPtr& meta, const DBOptions& options, meta::SegmentsSchema& files);
Status
Execute();
private:
meta::MetaPtr meta_ptr_;
DBOptions options_;
meta::SegmentsSchema files_;
}; // MergeTask
} // namespace engine
} // namespace milvus

View File

@ -55,7 +55,7 @@ class Meta {
DescribeCollection(CollectionSchema& table_schema) = 0;
virtual Status
HasCollection(const std::string& collection_id, bool& has_or_not) = 0;
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) = 0;
virtual Status
AllCollections(std::vector<CollectionSchema>& table_schema_array) = 0;

View File

@ -541,7 +541,7 @@ MySQLMetaImpl::DescribeCollection(CollectionSchema& collection_schema) {
}
Status
MySQLMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
MySQLMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root) {
try {
server::MetricCollector metric;
mysqlpp::StoreQueryResult res;
@ -557,20 +557,23 @@ MySQLMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not)
mysqlpp::Query HasCollectionQuery = connectionPtr->query();
// since collection_id is a unique column we just need to check whether it exists or not
HasCollectionQuery << "SELECT EXISTS"
<< " (SELECT 1 FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_id << " AND state <> " << std::to_string(CollectionSchema::TO_DELETE)
<< ")"
<< " AS " << mysqlpp::quote << "check"
<< ";";
if (is_root) {
HasCollectionQuery << "SELECT id FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_id << " AND state <> " << std::to_string(CollectionSchema::TO_DELETE)
<< " AND owner_table = " << mysqlpp::quote << ""
<< ";";
} else {
HasCollectionQuery << "SELECT id FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_id << " AND state <> " << std::to_string(CollectionSchema::TO_DELETE)
<< ";";
}
LOG_ENGINE_DEBUG_ << "HasCollection: " << HasCollectionQuery.str();
res = HasCollectionQuery.store();
} // Scoped Connection
int check = res[0]["check"];
has_or_not = (check == 1);
has_or_not = (res.num_rows() > 0);
} catch (std::exception& e) {
return HandleException("Failed to check collection existence", e.what());
}
@ -2505,7 +2508,8 @@ MySQLMetaImpl::DropAll() {
}
mysqlpp::Query statement = connectionPtr->query();
statement << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
statement << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ", "
<< ENVIRONMENT_SCHEMA.name() << ", " << FIELDS_SCHEMA.name() << ";";
LOG_ENGINE_DEBUG_ << "DropAll: " << statement.str();

View File

@ -38,7 +38,7 @@ class MySQLMetaImpl : public Meta {
DescribeCollection(CollectionSchema& collection_schema) override;
Status
HasCollection(const std::string& collection_id, bool& has_or_not) override;
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override;
Status
AllCollections(std::vector<CollectionSchema>& collection_schema_array) override;

View File

@ -272,7 +272,7 @@ SqliteMetaImpl::DescribeCollection(CollectionSchema& collection_schema) {
}
Status
SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root) {
has_or_not = false;
try {
@ -281,11 +281,21 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto collections = ConnectorPtr->select(
columns(&CollectionSchema::id_),
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
if (collections.size() == 1) {
auto select_columns = columns(&CollectionSchema::id_, &CollectionSchema::owner_collection_);
decltype(ConnectorPtr->select(select_columns)) selected;
if (is_root) {
selected = ConnectorPtr->select(select_columns,
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
and c(&CollectionSchema::owner_collection_) == ""));
} else {
selected = ConnectorPtr->select(select_columns,
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
}
if (selected.size() == 1) {
has_or_not = true;
} else {
has_or_not = false;
@ -1742,6 +1752,8 @@ SqliteMetaImpl::DropAll() {
try {
ConnectorPtr->drop_table(META_TABLES);
ConnectorPtr->drop_table(META_TABLEFILES);
ConnectorPtr->drop_table(META_ENVIRONMENT);
ConnectorPtr->drop_table(META_FIELDS);
} catch (std::exception& e) {
return HandleException("Encounter exception when drop all meta", e.what());
}

View File

@ -40,7 +40,7 @@ class SqliteMetaImpl : public Meta {
DescribeCollection(CollectionSchema& collection_schema) override;
Status
HasCollection(const std::string& collection_id, bool& has_or_not) override;
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override;
Status
AllCollections(std::vector<CollectionSchema>& collection_schema_array) override;

View File

@ -36,7 +36,7 @@ Status
KnowhereResource::Initialize() {
server::Config& config = server::Config::GetInstance();
std::string simd_type;
CONFIG_CHECK(config.GetEngineConfigSimdType(simd_type));
STATUS_CHECK(config.GetEngineConfigSimdType(simd_type));
if (simd_type == "avx512") {
faiss::faiss_use_avx512 = true;
faiss::faiss_use_avx2 = false;
@ -64,7 +64,7 @@ KnowhereResource::Initialize() {
#ifdef MILVUS_GPU_VERSION
bool enable_gpu = false;
CONFIG_CHECK(config.GetGpuResourceConfigEnable(enable_gpu));
STATUS_CHECK(config.GetGpuResourceConfigEnable(enable_gpu));
fiu_do_on("KnowhereResource.Initialize.disable_gpu", enable_gpu = false);
if (not enable_gpu)
return Status::OK();
@ -79,7 +79,7 @@ KnowhereResource::Initialize() {
// get build index gpu resource
std::vector<int64_t> build_index_gpus;
CONFIG_CHECK(config.GetGpuResourceConfigBuildIndexResources(build_index_gpus));
STATUS_CHECK(config.GetGpuResourceConfigBuildIndexResources(build_index_gpus));
for (auto gpu_id : build_index_gpus) {
gpu_resources.insert(std::make_pair(gpu_id, GpuResourceSetting()));
@ -87,7 +87,7 @@ KnowhereResource::Initialize() {
// get search gpu resource
std::vector<int64_t> search_gpus;
CONFIG_CHECK(config.GetGpuResourceConfigSearchResources(search_gpus));
STATUS_CHECK(config.GetGpuResourceConfigSearchResources(search_gpus));
for (auto& gpu_id : search_gpus) {
gpu_resources.insert(std::make_pair(gpu_id, GpuResourceSetting()));

View File

@ -38,7 +38,7 @@ if (FAISS_FOUND)
set_target_properties(
faiss
PROPERTIES
INTERFACE_LINK_LIBRARIES ${BLAS_LIBRARIES} ${LAPACK_LIBRARIES})
INTERFACE_LINK_LIBRARIES ${OpenBLAS_LIBRARIES})
endif ()
endif ()
endif ()

View File

@ -0,0 +1,91 @@
if (OpenBLAS_FOUND) # the git version propose a OpenBLASConfig.cmake
message(STATUS "OpenBLASConfig found")
set(OpenBLAS_INCLUDE_DIR ${OpenBLAS_INCLUDE_DIRS})
else()
message("OpenBLASConfig not found")
unset(OpenBLAS_DIR CACHE)
set(OpenBLAS_INCLUDE_SEARCH_PATHS
/usr/local/openblas/include
/usr/include
/usr/include/openblas
/usr/include/openblas-base
/usr/local/include
/usr/local/include/openblas
/usr/local/include/openblas-base
/opt/OpenBLAS/include
/usr/local/opt/openblas/include
$ENV{OpenBLAS_HOME}
$ENV{OpenBLAS_HOME}/include
)
set(OpenBLAS_LIB_SEARCH_PATHS
/usr/local/openblas/lib
/lib/
/lib/openblas-base
/lib64/
/usr/lib
/usr/lib/openblas-base
/usr/lib64
/usr/local/lib
/usr/local/lib64
/usr/local/opt/openblas/lib
/opt/OpenBLAS/lib
$ENV{OpenBLAS}
$ENV{OpenBLAS}/lib
$ENV{OpenBLAS_HOME}
$ENV{OpenBLAS_HOME}/lib
)
set(DEFAULT_OpenBLAS_LIB_PATH
/usr/local/openblas/lib
${OPENBLAS_PREFIX}/lib)
message("DEFAULT_OpenBLAS_LIB_PATH: ${DEFAULT_OpenBLAS_LIB_PATH}")
find_path(OpenBLAS_INCLUDE_DIR NAMES openblas_config.h lapacke.h PATHS ${OpenBLAS_INCLUDE_SEARCH_PATHS})
find_library(OpenBLAS_LIB NAMES openblas PATHS ${DEFAULT_OpenBLAS_LIB_PATH} NO_DEFAULT_PATH)
find_library(OpenBLAS_LIB NAMES openblas PATHS ${OpenBLAS_LIB_SEARCH_PATHS})
# mostly for debian
find_library(Lapacke_LIB NAMES lapacke PATHS ${DEFAULT_OpenBLAS_LIB_PATH} NO_DEFAULT_PATH)
find_library(Lapacke_LIB NAMES lapacke PATHS ${OpenBLAS_LIB_SEARCH_PATHS})
set(OpenBLAS_FOUND ON)
# Check include files
if(NOT OpenBLAS_INCLUDE_DIR)
set(OpenBLAS_FOUND OFF)
message(STATUS "Could not find OpenBLAS include. Turning OpenBLAS_FOUND off")
else()
message(STATUS "find OpenBLAS include:${OpenBLAS_INCLUDE_DIR} ")
endif()
# Check libraries
if(NOT OpenBLAS_LIB)
set(OpenBLAS_FOUND OFF)
message(STATUS "Could not find OpenBLAS lib. Turning OpenBLAS_FOUND off")
else()
message(STATUS "find OpenBLAS lib:${OpenBLAS_LIB} ")
endif()
if (OpenBLAS_FOUND)
set(OpenBLAS_LIBRARIES ${OpenBLAS_LIB})
STRING(REGEX REPLACE "/libopenblas.so" "" OpenBLAS_LIB_DIR ${OpenBLAS_LIBRARIES})
message(STATUS "find OpenBLAS libraries:${OpenBLAS_LIBRARIES} ")
if (Lapacke_LIB)
set(OpenBLAS_LIBRARIES ${OpenBLAS_LIBRARIES} ${Lapacke_LIB})
endif()
if (NOT OpenBLAS_FIND_QUIETLY)
message(STATUS "Found OpenBLAS libraries: ${OpenBLAS_LIBRARIES}")
message(STATUS "Found OpenBLAS include: ${OpenBLAS_INCLUDE_DIR}")
endif()
else()
if (OpenBLAS_FIND_REQUIRED)
message(FATAL_ERROR "Could not find OpenBLAS")
endif()
endif()
endif()
mark_as_advanced(
OpenBLAS_INCLUDE_DIR
OpenBLAS_LIBRARIES
OpenBLAS_LIB_DIR
)

View File

@ -10,7 +10,6 @@
# or implied. See the License for the specific language governing permissions and limitations under the License.
set(KNOWHERE_THIRDPARTY_DEPENDENCIES
Arrow
FAISS
GTest
@ -318,18 +317,15 @@ endif ()
set(OPENBLAS_PREFIX "${INDEX_BINARY_DIR}/openblas_ep-prefix/src/openblas_ep")
macro(build_openblas)
message(STATUS "Building OpenBLAS-${OPENBLAS_VERSION} from source")
set(OPENBLAS_INCLUDE_DIR "${OPENBLAS_PREFIX}/include")
if (CMAKE_BUILD_TYPE STREQUAL "Release")
set(OPENBLAS_SHARED_LIB
"${OPENBLAS_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas${CMAKE_SHARED_LIBRARY_SUFFIX}")
elseif(CMAKE_BUILD_TYPE STREQUAL "Debug")
set(OPENBLAS_SHARED_LIB
"${OPENBLAS_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas_d${CMAKE_SHARED_LIBRARY_SUFFIX}")
endif()
set(OpenBLAS_INCLUDE_DIR "${OPENBLAS_PREFIX}/include")
set(OpenBLAS_LIB_DIR "${OPENBLAS_PREFIX}/lib")
set(OPENBLAS_SHARED_LIB
"${OPENBLAS_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}openblas${CMAKE_SHARED_LIBRARY_SUFFIX}")
set(OPENBLAS_STATIC_LIB
"${OPENBLAS_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}openblas${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(OPENBLAS_CMAKE_ARGS
${EP_COMMON_CMAKE_ARGS}
-DCMAKE_BUILD_TYPE=Release
-DBUILD_SHARED_LIBS=ON
-DBUILD_STATIC_LIBS=ON
-DTARGET=CORE2
@ -342,7 +338,7 @@ macro(build_openblas)
-DINTERFACE64=0
-DNUM_THREADS=128
-DNO_LAPACKE=1
"-DVERSION=${VERSION}"
"-DVERSION=${OPENBLAS_VERSION}"
"-DCMAKE_INSTALL_PREFIX=${OPENBLAS_PREFIX}"
-DCMAKE_INSTALL_LIBDIR=lib)
@ -365,21 +361,23 @@ macro(build_openblas)
${OPENBLAS_SHARED_LIB}
${OPENBLAS_STATIC_LIB})
file(MAKE_DIRECTORY "${OPENBLAS_INCLUDE_DIR}")
file(MAKE_DIRECTORY "${OpenBLAS_INCLUDE_DIR}")
add_library(openblas SHARED IMPORTED)
set_target_properties(
openblas
PROPERTIES IMPORTED_LOCATION "${OPENBLAS_SHARED_LIB}"
INTERFACE_INCLUDE_DIRECTORIES "${OPENBLAS_INCLUDE_DIR}")
PROPERTIES
IMPORTED_LOCATION "${OPENBLAS_SHARED_LIB}"
LIBRARY_OUTPUT_NAME "openblas"
INTERFACE_INCLUDE_DIRECTORIES "${OpenBLAS_INCLUDE_DIR}")
add_dependencies(openblas openblas_ep)
get_target_property(OpenBLAS_INCLUDE_DIR openblas INTERFACE_INCLUDE_DIRECTORIES)
set(OpenBLAS_LIBRARIES "${OPENBLAS_SHARED_LIB}")
endmacro()
if (KNOWHERE_WITH_OPENBLAS)
resolve_dependency(OpenBLAS)
get_target_property(OPENBLAS_INCLUDE_DIR openblas INTERFACE_INCLUDE_DIRECTORIES)
include_directories(SYSTEM "${OPENBLAS_INCLUDE_DIR}")
link_directories(SYSTEM ${OPENBLAS_PREFIX}/lib)
include_directories(SYSTEM "${OpenBLAS_INCLUDE_DIR}")
link_directories(SYSTEM "${OpenBLAS_LIB_DIR}")
endif()
# ----------------------------------------------------------------------
@ -525,8 +523,13 @@ macro(build_faiss)
)
else ()
message(STATUS "Build Faiss with OpenBlas/LAPACK")
set(FAISS_CONFIGURE_ARGS ${FAISS_CONFIGURE_ARGS}
"LDFLAGS=-L${OPENBLAS_PREFIX}/lib -L${LAPACK_PREFIX}/lib")
if(OpenBLAS_FOUND)
set(FAISS_CONFIGURE_ARGS ${FAISS_CONFIGURE_ARGS}
"LDFLAGS=-L${OpenBLAS_LIB_DIR}")
else()
set(FAISS_CONFIGURE_ARGS ${FAISS_CONFIGURE_ARGS}
"LDFLAGS=-L${OPENBLAS_PREFIX}/lib")
endif()
endif ()
if (KNOWHERE_GPU_VERSION)
@ -577,6 +580,11 @@ macro(build_faiss)
${FAISS_STATIC_LIB})
endif ()
if(NOT OpenBLAS_FOUND)
message("add faiss dependencies: openblas_ep")
ExternalProject_Add_StepDependencies(faiss_ep configure openblas_ep)
endif()
file(MAKE_DIRECTORY "${FAISS_INCLUDE_DIR}")
add_library(faiss STATIC IMPORTED)
@ -595,11 +603,9 @@ macro(build_faiss)
set_target_properties(
faiss
PROPERTIES
# INTERFACE_LINK_LIBRARIES ${BLAS_LIBRARIES} ${LAPACK_LIBRARIES})
INTERFACE_LINK_LIBRARIES "openblas")
INTERFACE_LINK_LIBRARIES "${OpenBLAS_LIBRARIES}")
endif ()
add_dependencies(faiss faiss_ep)
endmacro()

View File

@ -71,7 +71,7 @@ if (FAISS_WITH_MKL)
)
else ()
set(depend_libs ${depend_libs}
${BLAS_LIBRARIES}
${OpenBLAS_LIBRARIES}
${LAPACK_LIBRARIES}
)
endif ()
@ -118,7 +118,7 @@ set(INDEX_INCLUDE_DIRS
${INDEX_SOURCE_DIR}/thirdparty/SPTAG/AnnService
# ${ARROW_INCLUDE_DIR}
${FAISS_INCLUDE_DIR}
${OPENBLAS_INCLUDE_DIR}
${OpenBLAS_INCLUDE_DIR}
${LAPACK_INCLUDE_DIR}
)

View File

@ -191,7 +191,7 @@ void Clustering::train (idx_t nx, const float *x_in, Index & index) {
float err = 0;
for (int i = 0; i < niter; i++) {
double t0s = getmillisecs();
index.search (nx, x, 1, dis, assign);
index.assign(nx, x, assign, dis);
InterruptCallback::check();
t_search_tot += getmillisecs() - t0s;

View File

@ -36,11 +36,13 @@ void Index::range_search (idx_t , const float *, float,
FAISS_THROW_MSG ("range search not implemented");
}
void Index::assign (idx_t n, const float * x, idx_t * labels, idx_t k)
void Index::assign (idx_t n, const float *x, idx_t *labels, float *distance)
{
float * distances = new float[n * k];
ScopeDeleter<float> del(distances);
search (n, x, k, distances, labels);
float *dis_inner = (distance == nullptr) ? new float[n] : distance;
search (n, x, 1, dis_inner, labels);
if (distance == nullptr) {
delete[] dis_inner;
}
}
void Index::add_with_ids(idx_t n, const float* x, const idx_t* xids) {

View File

@ -183,9 +183,9 @@ struct Index {
*
* This function is identical as search but only return labels of neighbors.
* @param x input vectors to search, size n * d
* @param labels output labels of the NNs, size n*k
* @param labels output labels of the NNs, size n
*/
void assign (idx_t n, const float * x, idx_t * labels, idx_t k = 1);
virtual void assign (idx_t n, const float *x, idx_t *labels, float *distance = nullptr);
/// removes all elements from the database.
virtual void reset() = 0;

View File

@ -64,6 +64,30 @@ void IndexFlat::search(idx_t n, const float* x, idx_t k, float* distances, idx_t
}
}
void IndexFlat::assign(idx_t n, const float * x, idx_t * labels, float* distances)
{
// usually used in IVF k-means algorithm
float *dis_inner = (distances == nullptr) ? new float[n] : distances;
switch (metric_type) {
case METRIC_INNER_PRODUCT:
case METRIC_L2: {
// ignore the metric_type, both use L2
elkan_L2_sse(x, xb.data(), d, n, ntotal, labels, dis_inner);
break;
}
default: {
// binary metrics
// There may be something wrong, but maintain the original logic now.
Index::assign(n, x, labels, dis_inner);
break;
}
}
if (distances == nullptr) {
delete[] dis_inner;
}
}
void IndexFlat::range_search (idx_t n, const float *x, float radius,
RangeSearchResult *result,
ConcurrentBitsetPtr bitset) const

View File

@ -36,6 +36,12 @@ struct IndexFlat: Index {
idx_t* labels,
ConcurrentBitsetPtr bitset = nullptr) const override;
void assign (
idx_t n,
const float * x,
idx_t * labels,
float* distances = nullptr) override;
void range_search(
idx_t n,
const float* x,

View File

@ -57,9 +57,9 @@ int faiss_Index_range_search(const FaissIndex* index, idx_t n, const float* x, f
} CATCH_AND_HANDLE
}
int faiss_Index_assign(FaissIndex* index, idx_t n, const float * x, idx_t * labels, idx_t k) {
int faiss_Index_assign(FaissIndex* index, idx_t n, const float * x, idx_t * labels) {
try {
reinterpret_cast<faiss::Index*>(index)->assign(n, x, labels, k);
reinterpret_cast<faiss::Index*>(index)->assign(n, x, labels);
} CATCH_AND_HANDLE
}

View File

@ -106,9 +106,9 @@ int faiss_Index_range_search(const FaissIndex* index, idx_t n, const float* x,
* This function is identical as search but only return labels of neighbors.
* @param index opaque pointer to index object
* @param x input vectors to search, size n * d
* @param labels output labels of the NNs, size n*k
* @param labels output labels of the NNs, size n
*/
int faiss_Index_assign(FaissIndex* index, idx_t n, const float * x, idx_t * labels, idx_t k);
int faiss_Index_assign(FaissIndex* index, idx_t n, const float * x, idx_t * labels);
/** removes all elements from the database.
* @param index opaque pointer to index object

View File

@ -352,68 +352,6 @@ static void knn_L2sqr_sse (
*/
}
static void elkan_L2_sse (
const float * x,
const float * y,
size_t d, size_t nx, size_t ny,
float_maxheap_array_t * res) {
if (nx == 0 || ny == 0) {
return;
}
const size_t bs_y = 1024;
float *data = (float *) malloc((bs_y * (bs_y - 1) / 2) * sizeof (float));
for (size_t j0 = 0; j0 < ny; j0 += bs_y) {
size_t j1 = j0 + bs_y;
if (j1 > ny) j1 = ny;
auto Y = [&](size_t i, size_t j) -> float& {
assert(i != j);
i -= j0, j -= j0;
return (i > j) ? data[j + i * (i - 1) / 2] : data[i + j * (j - 1) / 2];
};
#pragma omp parallel for
for (size_t i = j0 + 1; i < j1; i++) {
const float *y_i = y + i * d;
for (size_t j = j0; j < i; j++) {
const float *y_j = y + j * d;
Y(i, j) = sqrt(fvec_L2sqr(y_i, y_j, d));
}
}
#pragma omp parallel for
for (size_t i = 0; i < nx; i++) {
const float *x_i = x + i * d;
int64_t ids_i = j0;
float val_i = sqrt(fvec_L2sqr(x_i, y + j0 * d, d));
float val_i_2 = val_i * 2;
for (size_t j = j0 + 1; j < j1; j++) {
if (val_i_2 <= Y(ids_i, j)) {
continue;
}
const float *y_j = y + j * d;
float disij = sqrt(fvec_L2sqr(x_i, y_j, d));
if (disij < val_i) {
ids_i = j;
val_i = disij;
val_i_2 = val_i * 2;
}
}
if (j0 == 0 || res->val[i] > val_i) {
res->val[i] = val_i;
res->ids[i] = ids_i;
}
}
}
free(data);
}
/** Find the nearest neighbors for nx queries in a set of ny vectors */
static void knn_inner_product_blas (
const float * x,
@ -668,11 +606,7 @@ void knn_L2sqr (const float * x,
float_maxheap_array_t * res,
ConcurrentBitsetPtr bitset)
{
if (bitset == nullptr && res->k == 1 && nx >= ny * 2) {
// Note: L2 but not L2sqr
// usually used in IVF::train
elkan_L2_sse(x, y, d, nx, ny, res);
} else if (d % 4 == 0 && nx < distance_compute_blas_threshold) {
if (d % 4 == 0 && nx < distance_compute_blas_threshold) {
knn_L2sqr_sse (x, y, d, nx, ny, res, bitset);
} else {
NopDistanceCorrection nop;
@ -1067,5 +1001,67 @@ void pairwise_L2sqr (int64_t d,
}
void elkan_L2_sse (
const float * x,
const float * y,
size_t d, size_t nx, size_t ny,
int64_t *ids, float *val) {
if (nx == 0 || ny == 0) {
return;
}
const size_t bs_y = 1024;
float *data = (float *) malloc((bs_y * (bs_y - 1) / 2) * sizeof (float));
for (size_t j0 = 0; j0 < ny; j0 += bs_y) {
size_t j1 = j0 + bs_y;
if (j1 > ny) j1 = ny;
auto Y = [&](size_t i, size_t j) -> float& {
assert(i != j);
i -= j0, j -= j0;
return (i > j) ? data[j + i * (i - 1) / 2] : data[i + j * (j - 1) / 2];
};
#pragma omp parallel for
for (size_t i = j0 + 1; i < j1; i++) {
const float *y_i = y + i * d;
for (size_t j = j0; j < i; j++) {
const float *y_j = y + j * d;
Y(i, j) = sqrt(fvec_L2sqr(y_i, y_j, d));
}
}
#pragma omp parallel for
for (size_t i = 0; i < nx; i++) {
const float *x_i = x + i * d;
int64_t ids_i = j0;
float val_i = sqrt(fvec_L2sqr(x_i, y + j0 * d, d));
float val_i_2 = val_i * 2;
for (size_t j = j0 + 1; j < j1; j++) {
if (val_i_2 <= Y(ids_i, j)) {
continue;
}
const float *y_j = y + j * d;
float disij = sqrt(fvec_L2sqr(x_i, y_j, d));
if (disij < val_i) {
ids_i = j;
val_i = disij;
val_i_2 = val_i * 2;
}
}
if (j0 == 0 || val[i] > val_i) {
val[i] = val_i;
ids[i] = ids_i;
}
}
}
free(data);
}
} // namespace faiss

View File

@ -247,6 +247,21 @@ void range_search_inner_product (
RangeSearchResult *result);
/***************************************************************************
* elkan
***************************************************************************/
/** Return the nearest neighors of each of the nx vectors x among the ny
*
* @param x query vectors, size nx * d
* @param y database vectors, size ny * d
* @param ids result array ids
* @param val result array value
*/
void elkan_L2_sse (
const float * x,
const float * y,
size_t d, size_t nx, size_t ny,
int64_t *ids, float *val);
} // namespace faiss

View File

@ -17,7 +17,7 @@ if (FAISS_WITH_MKL)
)
else ()
set(depend_libs ${depend_libs}
${BLAS_LIBRARIES}
${OpenBLAS_LIBRARIES}
${LAPACK_LIBRARIES}
)
endif ()

View File

@ -24,7 +24,7 @@ if (KNOWHERE_GPU_VERSION)
)
else ()
set(depend_libs ${depend_libs}
${BLAS_LIBRARIES}
${OpenBLAS_LIBRARIES}
${LAPACK_LIBRARIES}
)
endif ()

View File

@ -19,7 +19,7 @@ if (KNOWHERE_GPU_VERSION)
)
else ()
set(depend_libs ${depend_libs}
${BLAS_LIBRARIES}
${OpenBLAS_LIBRARIES}
${LAPACK_LIBRARIES}
)

View File

@ -15,6 +15,7 @@
#include "metrics/SystemInfo.h"
#include "utils/Log.h"
#include <unistd.h>
#include <string>
#include <utility>
@ -25,20 +26,29 @@ Status
PrometheusMetrics::Init() {
try {
Config& config = Config::GetInstance();
CONFIG_CHECK(config.GetMetricConfigEnableMonitor(startup_));
STATUS_CHECK(config.GetMetricConfigEnableMonitor(startup_));
if (!startup_) {
return Status::OK();
}
// Following should be read from config file.
std::string push_port, push_address;
CONFIG_CHECK(config.GetMetricConfigPort(push_port));
CONFIG_CHECK(config.GetMetricConfigAddress(push_address));
std::string server_port, push_port, push_address;
STATUS_CHECK(config.GetServerConfigPort(server_port));
STATUS_CHECK(config.GetMetricConfigPort(push_port));
STATUS_CHECK(config.GetMetricConfigAddress(push_address));
const std::string uri = std::string("/metrics");
// const std::size_t num_threads = 2;
auto labels = prometheus::Gateway::GetInstanceLabel("pushgateway");
std::string hostportstr;
char hostname[1024];
if (gethostname(hostname, sizeof(hostname)) == 0) {
hostportstr = std::string(hostname) + ":" + server_port;
} else {
hostportstr = "pushgateway";
}
auto labels = prometheus::Gateway::GetInstanceLabel(hostportstr);
// Init pushgateway
gateway_ = std::make_shared<prometheus::Gateway>(push_address, push_port, "milvus_metrics", labels);

View File

@ -47,13 +47,9 @@ BuildIndexPass::Run(const TaskPtr& task) {
LOG_SERVER_WARNING_ << "BuildIndexPass cannot get build index gpu!";
return false;
}
if (specified_gpu_id_ >= build_gpus_.size()) {
specified_gpu_id_ = specified_gpu_id_ % build_gpus_.size();
}
LOG_SERVER_DEBUG_ << "Specify gpu" << specified_gpu_id_ << " to build index!";
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_gpus_[specified_gpu_id_]);
specified_gpu_id_ = (specified_gpu_id_ + 1) % build_gpus_.size();
LOG_SERVER_DEBUG_ << "Specify gpu" << build_gpus_[idx_] << " to build index!";
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_gpus_[idx_]);
idx_ = (idx_ + 1) % build_gpus_.size();
}
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));

View File

@ -40,7 +40,7 @@ class BuildIndexPass : public Pass, public server::GpuResourceConfigHandler {
Run(const TaskPtr& task) override;
private:
uint64_t specified_gpu_id_ = 0;
uint64_t idx_ = 0;
};
using BuildIndexPassPtr = std::shared_ptr<BuildIndexPass>;

View File

@ -61,11 +61,10 @@ FaissFlatPass::Run(const TaskPtr& task) {
"search", 0);
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % search_gpus_.size();
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissFlatPass: nq > gpu_search_threshold, specify gpu %d to search!",
"search", 0, best_device_id);
++count_;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissFlatPass: nq >= gpu_search_threshold, specify gpu %d to search!",
"search", 0, search_gpus_[idx_]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[idx_]);
idx_ = (idx_ + 1) % search_gpus_.size();
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -41,7 +41,7 @@ class FaissFlatPass : public Pass, public server::GpuResourceConfigHandler {
Run(const TaskPtr& task) override;
private:
int64_t count_ = 0;
int64_t idx_ = 0;
};
using FaissFlatPassPtr = std::shared_ptr<FaissFlatPass>;

View File

@ -62,11 +62,10 @@ FaissIVFFlatPass::Run(const TaskPtr& task) {
"search", 0);
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % search_gpus_.size();
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFFlatPass: nq > gpu_search_threshold, specify gpu %d to search!",
"search", 0, best_device_id);
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFFlatPass: nq >= gpu_search_threshold, specify gpu %d to search!",
"search", 0, search_gpus_[idx_]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[idx_]);
idx_ = (idx_ + 1) % search_gpus_.size();
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -41,7 +41,7 @@ class FaissIVFFlatPass : public Pass, public server::GpuResourceConfigHandler {
Run(const TaskPtr& task) override;
private:
int64_t count_ = 0;
int64_t idx_ = 0;
};
using FaissIVFFlatPassPtr = std::shared_ptr<FaissIVFFlatPass>;

View File

@ -64,11 +64,10 @@ FaissIVFPQPass::Run(const TaskPtr& task) {
"search", 0);
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % search_gpus_.size();
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFPQPass: nq > gpu_search_threshold, specify gpu %d to search!",
"search", 0, best_device_id);
++count_;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFPQPass: nq >= gpu_search_threshold, specify gpu %d to search!",
"search", 0, search_gpus_[idx_]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[idx_]);
idx_ = (idx_ + 1) % search_gpus_.size();
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -41,7 +41,7 @@ class FaissIVFPQPass : public Pass, public server::GpuResourceConfigHandler {
Run(const TaskPtr& task) override;
private:
int64_t count_ = 0;
int64_t idx_ = 0;
};
using FaissIVFPQPassPtr = std::shared_ptr<FaissIVFPQPass>;

View File

@ -62,11 +62,10 @@ FaissIVFSQ8HPass::Run(const TaskPtr& task) {
"search", 0);
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % search_gpus_.size();
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFSQ8HPass: nq > gpu_search_threshold, specify gpu %d to search!",
"search", 0, best_device_id);
++count_;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFSQ8HPass: nq >= gpu_search_threshold, specify gpu %d to search!",
"search", 0, search_gpus_[idx_]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[idx_]);
idx_ = (idx_ + 1) % search_gpus_.size();
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -41,7 +41,7 @@ class FaissIVFSQ8HPass : public Pass, public server::GpuResourceConfigHandler {
Run(const TaskPtr& task) override;
private:
int64_t count_ = 0;
int64_t idx_ = 0;
};
using FaissIVFSQ8HPassPtr = std::shared_ptr<FaissIVFSQ8HPass>;

View File

@ -62,11 +62,10 @@ FaissIVFSQ8Pass::Run(const TaskPtr& task) {
"search", 0);
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % search_gpus_.size();
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFSQ8Pass: nq > gpu_search_threshold, specify gpu %d to search!",
"search", 0, best_device_id);
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissIVFSQ8Pass: nq >= gpu_search_threshold, specify gpu %d to search!",
"search", 0, search_gpus_[idx_]);
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[idx_]);
idx_ = (idx_ + 1) % search_gpus_.size();
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;

View File

@ -41,7 +41,7 @@ class FaissIVFSQ8Pass : public Pass, public server::GpuResourceConfigHandler {
Run(const TaskPtr& task) override;
private:
int64_t count_ = 0;
int64_t idx_ = 0;
};
using FaissIVFSQ8PassPtr = std::shared_ptr<FaissIVFSQ8Pass>;

View File

@ -64,6 +64,12 @@ DBWrapper::StartService() {
StringHelpFunctions::SplitStringByDelimeter(db_slave_path, ";", opt.meta_.slave_paths_);
s = config.GetStorageConfigFileCleanupTimeup(opt.file_cleanup_timeout_);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
return s;
}
// cache config
s = config.GetCacheConfigCacheInsertData(opt.insert_cache_immediately_);
if (!s.ok()) {
@ -71,6 +77,14 @@ DBWrapper::StartService() {
return s;
}
int64_t insert_buffer_size = 1 * engine::GB;
s = config.GetCacheConfigInsertBufferSize(insert_buffer_size);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
return s;
}
opt.insert_buffer_size_ = insert_buffer_size * engine::GB;
std::string mode;
s = config.GetServerConfigDeployMode(mode);
if (!s.ok()) {

View File

@ -159,6 +159,14 @@ Server::Start() {
Config& config = Config::GetInstance();
std::string meta_uri;
STATUS_CHECK(config.GetDBConfigBackendUrl(meta_uri));
if (meta_uri.length() > 6 && strcasecmp("sqlite", meta_uri.substr(0, 6).c_str()) == 0) {
std::cout << "WARNNING: You are using SQLite as the meta data management, "
"which can't be used in production. Please change it to MySQL!"
<< std::endl;
}
/* Init opentracing tracer from config */
std::string tracing_config_path;
s = config.GetTracingConfigJsonConfigPath(tracing_config_path);
@ -201,58 +209,25 @@ Server::Start() {
std::string logs_path;
int64_t max_log_file_size = 0;
int64_t delete_exceeds = 0;
s = config.GetLogsTraceEnable(trace_enable);
if (!s.ok()) {
return s;
}
s = config.GetLogsDebugEnable(debug_enable);
if (!s.ok()) {
return s;
}
s = config.GetLogsInfoEnable(info_enable);
if (!s.ok()) {
return s;
}
s = config.GetLogsWarningEnable(warning_enable);
if (!s.ok()) {
return s;
}
s = config.GetLogsErrorEnable(error_enable);
if (!s.ok()) {
return s;
}
s = config.GetLogsFatalEnable(fatal_enable);
if (!s.ok()) {
return s;
}
s = config.GetLogsPath(logs_path);
if (!s.ok()) {
return s;
}
s = config.GetLogsMaxLogFileSize(max_log_file_size);
if (!s.ok()) {
return s;
}
s = config.GetLogsDeleteExceeds(delete_exceeds);
if (!s.ok()) {
return s;
}
STATUS_CHECK(config.GetLogsTraceEnable(trace_enable));
STATUS_CHECK(config.GetLogsDebugEnable(debug_enable));
STATUS_CHECK(config.GetLogsInfoEnable(info_enable));
STATUS_CHECK(config.GetLogsWarningEnable(warning_enable));
STATUS_CHECK(config.GetLogsErrorEnable(error_enable));
STATUS_CHECK(config.GetLogsFatalEnable(fatal_enable));
STATUS_CHECK(config.GetLogsPath(logs_path));
STATUS_CHECK(config.GetLogsMaxLogFileSize(max_log_file_size));
STATUS_CHECK(config.GetLogsDeleteExceeds(delete_exceeds));
InitLog(trace_enable, debug_enable, info_enable, warning_enable, error_enable, fatal_enable, logs_path,
max_log_file_size, delete_exceeds);
}
std::string deploy_mode;
s = config.GetServerConfigDeployMode(deploy_mode);
if (!s.ok()) {
return s;
}
STATUS_CHECK(config.GetServerConfigDeployMode(deploy_mode));
if (deploy_mode == "single" || deploy_mode == "cluster_writable") {
std::string db_path;
s = config.GetStorageConfigPrimaryPath(db_path);
if (!s.ok()) {
return s;
}
STATUS_CHECK(config.GetStorageConfigPrimaryPath(db_path));
try {
// True if a new directory was created, otherwise false.
@ -268,17 +243,11 @@ Server::Start() {
}
bool wal_enable = false;
s = config.GetWalConfigEnable(wal_enable);
if (!s.ok()) {
return s;
}
STATUS_CHECK(config.GetWalConfigEnable(wal_enable));
if (wal_enable) {
std::string wal_path;
s = config.GetWalConfigWalPath(wal_path);
if (!s.ok()) {
return s;
}
STATUS_CHECK(config.GetWalConfigWalPath(wal_path));
try {
// True if a new directory was created, otherwise false.
@ -301,21 +270,10 @@ Server::Start() {
#else
LOG_SERVER_INFO_ << "CPU edition";
#endif
s = StorageChecker::CheckStoragePermission();
if (!s.ok()) {
return s;
}
s = CpuChecker::CheckCpuInstructionSet();
if (!s.ok()) {
return s;
}
STATUS_CHECK(StorageChecker::CheckStoragePermission());
STATUS_CHECK(CpuChecker::CheckCpuInstructionSet());
#ifdef MILVUS_GPU_VERSION
s = GpuChecker::CheckGpuEnvironment();
if (!s.ok()) {
return s;
}
STATUS_CHECK(GpuChecker::CheckGpuEnvironment());
#endif
/* record config and hardware information into log */
LogConfigInFile(config_filename_);

View File

@ -54,6 +54,16 @@ Context::IsConnectionBroken() const {
return context_->IsConnectionBroken();
}
BaseRequest::RequestType
Context::GetRequestType() const {
return request_type_;
}
void
Context::SetRequestType(BaseRequest::RequestType type) {
request_type_ = type;
}
/////////////////////////////////////////////////////////////////////////////////////////////////
ContextChild::ContextChild(const ContextPtr& context, const std::string& operation_name) {
if (context) {

View File

@ -18,6 +18,7 @@
#include <grpcpp/server_context.h>
#include "server/context/ConnectionContext.h"
#include "server/delivery/request/BaseRequest.h"
#include "tracing/TraceContext.h"
namespace milvus {
@ -50,8 +51,15 @@ class Context {
bool
IsConnectionBroken() const;
BaseRequest::RequestType
GetRequestType() const;
void
SetRequestType(BaseRequest::RequestType type);
private:
std::string request_id_;
BaseRequest::RequestType request_type_;
std::shared_ptr<tracing::TraceContext> trace_context_;
ConnectionContextPtr context_;
};

View File

@ -10,12 +10,14 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/delivery/request/BaseRequest.h"
#include <map>
#include "server/context/Context.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include <map>
namespace milvus {
namespace server {
@ -81,6 +83,9 @@ BaseRequest::BaseRequest(const std::shared_ptr<milvus::server::Context>& context
bool async)
: context_(context), type_(type), async_(async), done_(false) {
request_group_ = milvus::server::RequestGroup(type);
if (nullptr != context_) {
context_->SetRequestType(type_);
}
}
BaseRequest::~BaseRequest() {

View File

@ -17,7 +17,6 @@
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "query/GeneralQuery.h"
#include "server/context/Context.h"
#include "utils/Json.h"
#include "utils/Status.h"
@ -103,6 +102,8 @@ struct PartitionParam {
}
};
class Context;
class BaseRequest {
public:
enum RequestType {

View File

@ -49,6 +49,13 @@ FlushRequest::OnExecute() {
Status status = Status::OK();
LOG_SERVER_DEBUG_ << hdr;
// flush all collections
if (collection_names_.empty()) {
status = DBWrapper::DB()->Flush();
return status;
}
// flush specified collections
for (auto& name : collection_names_) {
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;

View File

@ -50,20 +50,10 @@ HasCollectionRequest::OnExecute() {
status = DBWrapper::DB()->HasNativeCollection(collection_name_, has_collection_);
fiu_do_on("HasCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
// only process root collection, ignore partition collection
if (has_collection_) {
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
if (!collection_schema.owner_collection_.empty()) {
has_collection_ = false;
}
}
return status;
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace server

View File

@ -384,6 +384,22 @@ SearchCombineRequest::OnExecute() {
return status;
}
// avoid memcpy crash, check id count = target vector count * topk
if (result_ids.size() != total_count * search_topk_) {
status = Status(DB_ERROR, "Result count doesn't match target vectors count");
// let all request return
FreeRequests(status);
return status;
}
// avoid memcpy crash, check distance count = id count
if (result_distances.size() != result_ids.size()) {
status = Status(DB_ERROR, "Result distance and id count doesn't match");
// let all request return
FreeRequests(status);
return status;
}
// step 5: construct result array
offset = 0;
for (auto& request : request_list_) {

View File

@ -41,14 +41,11 @@ SearchReqStrategy::ReScheduleQueue(const BaseRequestPtr& request, std::queue<Bas
if (last_req->GetRequestType() == BaseRequest::kSearch) {
SearchRequestPtr last_search_req = std::static_pointer_cast<SearchRequest>(last_req);
if (SearchCombineRequest::CanCombine(last_search_req, new_search_req)) {
// pop last request
queue.pop();
// combine request
SearchCombineRequestPtr combine_request = std::make_shared<SearchCombineRequest>();
combine_request->Combine(last_search_req);
combine_request->Combine(new_search_req);
queue.push(combine_request);
queue.back() = combine_request; // replace the last request to combine request
LOG_SERVER_DEBUG_ << "Combine 2 search request";
} else {
// directly put to queue

View File

@ -73,6 +73,25 @@ ErrorMap(ErrorCode code) {
}
}
std::string
RequestMap(BaseRequest::RequestType request_type) {
static const std::unordered_map<BaseRequest::RequestType, std::string> request_map = {
{BaseRequest::kInsert, "Insert"},
{BaseRequest::kCreateIndex, "CreateIndex"},
{BaseRequest::kSearch, "Search"},
{BaseRequest::kSearchByID, "SearchByID"},
{BaseRequest::kHybridSearch, "HybridSearch"},
{BaseRequest::kFlush, "Flush"},
{BaseRequest::kCompact, "Compact"},
};
if (request_map.find(request_type) != request_map.end()) {
return request_map.at(request_type);
} else {
return "OtherRequest";
}
}
namespace {
void
CopyRowRecords(const google::protobuf::RepeatedPtrField<::milvus::grpc::RowRecord>& grpc_records,
@ -670,8 +689,30 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Co
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__);
std::string reply;
Status status = request_handler_.Cmd(GetContext(context), request->cmd(), reply);
response->set_string_reply(reply);
Status status;
std::string cmd = request->cmd();
std::vector<std::string> requests;
if (cmd == "requests") {
std::lock_guard<std::mutex> lock(context_map_mutex_);
for (auto& iter : context_map_) {
if (nullptr == iter.second) {
continue;
}
if (iter.second->RequestID() == get_request_id(context)) {
continue;
}
auto request_str = RequestMap(iter.second->GetRequestType()) + "-" + iter.second->RequestID();
requests.emplace_back(request_str);
}
nlohmann::json reply_json;
reply_json["requests"] = requests;
reply = reply_json.dump();
response->set_string_reply(reply);
} else {
status = request_handler_.Cmd(GetContext(context), cmd, reply);
response->set_string_reply(reply);
}
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__);
SET_RESPONSE(response->mutable_status(), status, context);

View File

@ -77,8 +77,8 @@ GrpcServer::StartService() {
Config& config = Config::GetInstance();
std::string address, port;
CONFIG_CHECK(config.GetServerConfigAddress(address));
CONFIG_CHECK(config.GetServerConfigPort(port));
STATUS_CHECK(config.GetServerConfigAddress(address));
STATUS_CHECK(config.GetServerConfigPort(port));
std::string server_address(address + ":" + port);

View File

@ -47,7 +47,7 @@ WebServer::StartService() {
Config& config = Config::GetInstance();
std::string port;
CONFIG_CHECK(config.GetServerConfigWebPort(port));
STATUS_CHECK(config.GetServerConfigWebPort(port));
{
AppComponent components = AppComponent(std::stoi(port));

View File

@ -17,8 +17,8 @@
#include <oatpp/network/client/SimpleTCPConnectionProvider.hpp>
#include <oatpp/network/server/SimpleTCPConnectionProvider.hpp>
#include <oatpp/parser/json/mapping/Deserializer.hpp>
#include <oatpp/parser/json/mapping/Serializer.hpp>
#include <oatpp/parser/json/mapping/ObjectMapper.hpp>
#include <oatpp/parser/json/mapping/Serializer.hpp>
#include <oatpp/web/server/HttpConnectionHandler.hpp>
#include <oatpp/web/server/HttpRouter.hpp>

View File

@ -1457,7 +1457,35 @@ WebRequestHandler::ShowSegments(const OString& collection_name, const OQueryPara
ASSIGN_RETURN_STATUS_DTO(status)
}
nlohmann::json result_json = nlohmann::json::parse(info);
nlohmann::json info_json = nlohmann::json::parse(info);
nlohmann::json segments_json = nlohmann::json::array();
for (auto& par : info_json["partitions"]) {
if (!(all_required || tag.empty() || tag == par["tag"])) {
continue;
}
auto segments = par["segments"];
if (!segments.is_null()) {
for (auto& seg : segments) {
seg["partition_tag"] = par["tag"];
segments_json.push_back(seg);
}
}
}
nlohmann::json result_json;
if (!all_required) {
int64_t size = segments_json.size();
int iter_begin = std::min(size, offset);
int iter_end = std::min(size, offset + page_size);
nlohmann::json segments_slice_json = nlohmann::json::array();
segments_slice_json.insert(segments_slice_json.begin(), segments_json.begin() + iter_begin,
segments_json.begin() + iter_end);
result_json["segments"] = segments_slice_json; // segments_json;
} else {
result_json["segments"] = segments_json;
}
result_json["count"] = segments_json.size();
AddStatusToJson(result_json, status.code(), status.message());
response = result_json.dump().c_str();
@ -1535,9 +1563,14 @@ WebRequestHandler::Insert(const OString& collection_name, const OString& body, V
}
auto& id_array = vectors.id_array_;
id_array.clear();
for (auto& id_str : ids_json) {
int64_t id = std::stol(id_str.get<std::string>());
id_array.emplace_back(id);
try {
for (auto& id_str : ids_json) {
int64_t id = std::stol(id_str.get<std::string>());
id_array.emplace_back(id);
}
} catch (std::exception& e) {
std::string err_msg = std::string("Cannot convert vectors id. details: ") + e.what();
RETURN_STATUS_DTO(SERVER_UNEXPECTED_ERROR, err_msg.c_str());
}
}

View File

@ -17,6 +17,15 @@
namespace milvus {
class Status;
#define STATUS_CHECK(func) \
do { \
Status s = func; \
if (!s.ok()) { \
return s; \
} \
} while (false)
using StatusCode = ErrorCode;
class Status {

View File

@ -11,7 +11,7 @@ GRPC_VERSION=master
ZLIB_VERSION=v1.2.11
OPENTRACING_VERSION=v1.5.1
FIU_VERSION=1.00
OATPP_VERSION=1.0.0
OATPP_VERSION=1.0.1
AWS_VERSION=1.7.250
# vim: set filetype=sh:

View File

@ -20,6 +20,9 @@ include_directories(${MILVUS_ENGINE_SRC})
include_directories(${MILVUS_THIRDPARTY_SRC})
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status)
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus)
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/config/handler config_handler_files)
@ -28,6 +31,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/merge db_merge_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/wal db_wal_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/search search_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/query query_files)
@ -140,6 +144,7 @@ set(common_files
${db_engine_files}
${db_insert_files}
${db_meta_files}
${db_merge_files}
${db_wal_files}
${metrics_files}
${thirdparty_files}
@ -149,6 +154,7 @@ set(common_files
${helper_files}
${server_init_files}
${server_context_files}
${grpc_service_files}
${tracing_files}
${codecs_files}
${codecs_default_files}
@ -157,6 +163,14 @@ set(common_files
${query_files}
)
set(grpc_lib
grpcpp_channelz
grpc++
grpc
grpc_protobuf
grpc_protoc
)
set(unittest_libs
sqlite
libboost_system.a
@ -172,6 +186,7 @@ set(unittest_libs
opentracing
opentracing_mocktracer
fiu
${grpc_lib}
)
if (MILVUS_WITH_AWS)

View File

@ -23,27 +23,16 @@ set(test_files
include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include")
link_directories("${CUDA_TOOLKIT_ROOT_DIR}/lib64")
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status)
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus)
set(util_files
${MILVUS_ENGINE_SRC}/utils/StringHelpFunctions.cpp
${MILVUS_ENGINE_SRC}/utils/LogUtil.cpp
${MILVUS_ENGINE_SRC}/utils/SignalUtil.cpp)
set(grpc_service_files
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc
)
set(server_test_files
${common_files}
${server_files}
${server_init_files}
${grpc_server_files}
${grpc_service_files}
${server_delivery_files}
${web_server_files}
${util_files}
@ -53,19 +42,13 @@ set(server_test_files
add_executable(test_server ${server_test_files})
set(grpc_lib
grpcpp_channelz
grpc++
grpc
grpc_protobuf
grpc_protoc
)
target_link_libraries(test_server
knowhere
metrics
stdc++
${grpc_lib}
# ${grpc_lib}
${unittest_libs}
oatpp
)

View File

@ -209,35 +209,35 @@ TEST_F(ConfigTest, SERVER_CONFIG_VALID_TEST) {
ASSERT_TRUE(config.GetStorageConfigSecondaryPath(str_val).ok());
ASSERT_TRUE(str_val == storage_secondary_path);
bool storage_s3_enable = true;
ASSERT_TRUE(config.SetStorageConfigS3Enable(std::to_string(storage_s3_enable)).ok());
ASSERT_TRUE(config.GetStorageConfigS3Enable(bool_val).ok());
ASSERT_TRUE(bool_val == storage_s3_enable);
std::string storage_s3_addr = "192.168.1.100";
ASSERT_TRUE(config.SetStorageConfigS3Address(storage_s3_addr).ok());
ASSERT_TRUE(config.GetStorageConfigS3Address(str_val).ok());
ASSERT_TRUE(str_val == storage_s3_addr);
std::string storage_s3_port = "12345";
ASSERT_TRUE(config.SetStorageConfigS3Port(storage_s3_port).ok());
ASSERT_TRUE(config.GetStorageConfigS3Port(str_val).ok());
ASSERT_TRUE(str_val == storage_s3_port);
std::string storage_s3_access_key = "minioadmin";
ASSERT_TRUE(config.SetStorageConfigS3AccessKey(storage_s3_access_key).ok());
ASSERT_TRUE(config.GetStorageConfigS3AccessKey(str_val).ok());
ASSERT_TRUE(str_val == storage_s3_access_key);
std::string storage_s3_secret_key = "minioadmin";
ASSERT_TRUE(config.SetStorageConfigS3SecretKey(storage_s3_secret_key).ok());
ASSERT_TRUE(config.GetStorageConfigS3SecretKey(str_val).ok());
ASSERT_TRUE(str_val == storage_s3_secret_key);
std::string storage_s3_bucket = "s3bucket";
ASSERT_TRUE(config.SetStorageConfigS3Bucket(storage_s3_bucket).ok());
ASSERT_TRUE(config.GetStorageConfigS3Bucket(str_val).ok());
ASSERT_TRUE(str_val == storage_s3_bucket);
// bool storage_s3_enable = true;
// ASSERT_TRUE(config.SetStorageConfigS3Enable(std::to_string(storage_s3_enable)).ok());
// ASSERT_TRUE(config.GetStorageConfigS3Enable(bool_val).ok());
// ASSERT_TRUE(bool_val == storage_s3_enable);
//
// std::string storage_s3_addr = "192.168.1.100";
// ASSERT_TRUE(config.SetStorageConfigS3Address(storage_s3_addr).ok());
// ASSERT_TRUE(config.GetStorageConfigS3Address(str_val).ok());
// ASSERT_TRUE(str_val == storage_s3_addr);
//
// std::string storage_s3_port = "12345";
// ASSERT_TRUE(config.SetStorageConfigS3Port(storage_s3_port).ok());
// ASSERT_TRUE(config.GetStorageConfigS3Port(str_val).ok());
// ASSERT_TRUE(str_val == storage_s3_port);
//
// std::string storage_s3_access_key = "minioadmin";
// ASSERT_TRUE(config.SetStorageConfigS3AccessKey(storage_s3_access_key).ok());
// ASSERT_TRUE(config.GetStorageConfigS3AccessKey(str_val).ok());
// ASSERT_TRUE(str_val == storage_s3_access_key);
//
// std::string storage_s3_secret_key = "minioadmin";
// ASSERT_TRUE(config.SetStorageConfigS3SecretKey(storage_s3_secret_key).ok());
// ASSERT_TRUE(config.GetStorageConfigS3SecretKey(str_val).ok());
// ASSERT_TRUE(str_val == storage_s3_secret_key);
//
// std::string storage_s3_bucket = "s3bucket";
// ASSERT_TRUE(config.SetStorageConfigS3Bucket(storage_s3_bucket).ok());
// ASSERT_TRUE(config.GetStorageConfigS3Bucket(str_val).ok());
// ASSERT_TRUE(str_val == storage_s3_bucket);
/* metric config */
bool metric_enable_monitor = false;
@ -417,9 +417,9 @@ TEST_F(ConfigTest, SERVER_CONFIG_CLI_TEST) {
ASSERT_TRUE(s.ok());
/* storage config */
std::string storage_s3_enable = "true";
get_cmd = gen_get_command(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_S3_ENABLE);
set_cmd = gen_set_command(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_S3_ENABLE, storage_s3_enable);
std::string storage_primary_path = "/tmp/milvus1";
get_cmd = gen_get_command(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_PRIMARY_PATH);
set_cmd = gen_set_command(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_PRIMARY_PATH, storage_primary_path);
s = config.ProcessConfigCli(dummy, set_cmd);
ASSERT_TRUE(s.ok());
s = config.ProcessConfigCli(result, get_cmd);
@ -599,18 +599,18 @@ TEST_F(ConfigTest, SERVER_CONFIG_INVALID_TEST) {
ASSERT_FALSE(config.SetStorageConfigSecondaryPath("../milvus,./zilliz").ok());
ASSERT_FALSE(config.SetStorageConfigSecondaryPath("/home/^^__^^,/zilliz").ok());
ASSERT_FALSE(config.SetStorageConfigS3Enable("10").ok());
ASSERT_FALSE(config.SetStorageConfigS3Address("127.0.0").ok());
ASSERT_FALSE(config.SetStorageConfigS3Port("100").ok());
ASSERT_FALSE(config.SetStorageConfigS3Port("100000").ok());
ASSERT_FALSE(config.SetStorageConfigS3AccessKey("").ok());
ASSERT_FALSE(config.SetStorageConfigS3SecretKey("").ok());
ASSERT_FALSE(config.SetStorageConfigS3Bucket("").ok());
// ASSERT_FALSE(config.SetStorageConfigS3Enable("10").ok());
//
// ASSERT_FALSE(config.SetStorageConfigS3Address("127.0.0").ok());
//
// ASSERT_FALSE(config.SetStorageConfigS3Port("100").ok());
// ASSERT_FALSE(config.SetStorageConfigS3Port("100000").ok());
//
// ASSERT_FALSE(config.SetStorageConfigS3AccessKey("").ok());
//
// ASSERT_FALSE(config.SetStorageConfigS3SecretKey("").ok());
//
// ASSERT_FALSE(config.SetStorageConfigS3Bucket("").ok());
/* metric config */
ASSERT_FALSE(config.SetMetricConfigEnableMonitor("Y").ok());

View File

@ -934,6 +934,10 @@ TEST_F(RpcHandlerTest, CMD_TEST) {
handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.string_reply(), MILVUS_VERSION);
command.set_cmd("requests");
handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());
command.set_cmd("tasktable");
handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());

View File

@ -1026,11 +1026,12 @@ TEST_F(WebControllerTest, SHOW_SEGMENTS) {
std::string json_str = response->readBodyToString()->c_str();
auto result_json = nlohmann::json::parse(json_str);
ASSERT_TRUE(result_json.contains("row_count"));
ASSERT_TRUE(result_json.contains("partitions"));
auto segments_json = result_json["partitions"];
ASSERT_TRUE(result_json.contains("count"));
ASSERT_TRUE(result_json.contains("segments"));
auto segments_json = result_json["segments"];
ASSERT_TRUE(segments_json.is_array());
auto seg0_json = segments_json[0];
ASSERT_TRUE(seg0_json.contains("partition_tag"));
// ASSERT_EQ(10, segments_json.size());
}
@ -1049,7 +1050,7 @@ TEST_F(WebControllerTest, GET_SEGMENT_INFO) {
std::string json_str = response->readBodyToString()->c_str();
auto result_json = nlohmann::json::parse(json_str);
auto segment0_json = result_json["partitions"][0]["segments"][0];
auto segment0_json = result_json["segments"][0];
std::string segment_name = segment0_json["name"];
// get segment ids
@ -1104,15 +1105,15 @@ TEST_F(WebControllerTest, SEGMENT_FILTER) {
std::string json_str = response->readBodyToString()->c_str();
auto result_json = nlohmann::json::parse(json_str);
ASSERT_TRUE(result_json.contains("row_count"));
ASSERT_TRUE(result_json.contains("count"));
ASSERT_TRUE(result_json.contains("partitions"));
auto partitions_json = result_json["partitions"];
ASSERT_TRUE(partitions_json.is_array());
for (auto& part : partitions_json) {
ASSERT_TRUE(part.contains("tag"));
ASSERT_TRUE(result_json.contains("segments"));
auto segments_json = result_json["segments"];
ASSERT_TRUE(segments_json.is_array());
for (auto& part : segments_json) {
ASSERT_TRUE(part.contains("partition_tag"));
}
ASSERT_EQ("_default", partitions_json[0]["tag"].get<std::string>());
ASSERT_EQ("_default", segments_json[0]["partition_tag"].get<std::string>());
}
TEST_F(WebControllerTest, SEARCH) {

View File

@ -15,17 +15,25 @@ SHELL ["/bin/bash", "-o", "pipefail", "-c"]
RUN yum install -y epel-release centos-release-scl-rh && yum install -y wget curl which && \
wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \
yum install -y ccache make automake git python3-pip libcurl-devel python3-devel boost-static mysql-devel \
yum install -y make automake git python3-pip libcurl-devel python3-devel boost-static mysql-devel \
devtoolset-7-gcc devtoolset-7-gcc-c++ devtoolset-7-gcc-gfortran llvm-toolset-7.0-clang llvm-toolset-7.0-clang-tools-extra \
mysql lcov openblas-devel lapack-devel \
&& \
rm -rf /var/cache/yum/*
RUN echo "source scl_source enable devtoolset-7" >> /etc/profile.d/devtoolset-7.sh
RUN echo "source scl_source enable llvm-toolset-7.0" >> /etc/profile.d/llvm-toolset-7.sh
mysql lcov && \
rm -rf /var/cache/yum/* && \
echo "source scl_source enable devtoolset-7" >> /etc/profile.d/devtoolset-7.sh && \
echo "source scl_source enable llvm-toolset-7.0" >> /etc/profile.d/llvm-toolset-7.sh
ENV CLANG_TOOLS_PATH="/opt/rh/llvm-toolset-7.0/root/usr/bin"
RUN source /etc/profile.d/devtoolset-7.sh && \
wget https://github.com/xianyi/OpenBLAS/archive/v0.3.9.tar.gz && \
tar zxvf v0.3.9.tar.gz && cd OpenBLAS-0.3.9 && \
make TARGET=CORE2 DYNAMIC_ARCH=1 DYNAMIC_OLDER=1 USE_THREAD=0 USE_OPENMP=0 FC=gfortran CC=gcc COMMON_OPT="-O3 -g -fPIC" FCOMMON_OPT="-O3 -g -fPIC -frecursive" NMAX="NUM_THREADS=128" LIBPREFIX="libopenblas" LAPACKE="NO_LAPACKE=1" INTERFACE64=0 NO_STATIC=1 && \
make PREFIX=/usr install && \
cd .. && rm -rf OpenBLAS-0.3.9 && rm v0.3.9.tar.gz
RUN yum install -y ccache && \
rm -rf /var/cache/yum/*
COPY docker-entrypoint.sh /app/docker-entrypoint.sh
WORKDIR /root

View File

@ -21,11 +21,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget ca-certifi
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \
wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \
apt-get update && apt-get install -y --no-install-recommends \
g++ git gfortran lsb-core ccache \
g++ git gfortran lsb-core \
libboost-serialization-dev libboost-filesystem-dev libboost-system-dev libboost-regex-dev \
curl libtool automake libssl-dev pkg-config libcurl4-openssl-dev python3-pip \
clang-format-6.0 clang-tidy-6.0 \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 libopenblas-dev liblapack3 && \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 && \
apt-get remove --purge -y && \
rm -rf /var/lib/apt/lists/*
@ -34,6 +34,16 @@ RUN ln -s /usr/lib/x86_64-linux-gnu/libmysqlclient.so \
RUN sh -c 'echo export LD_LIBRARY_PATH=/opt/intel/compilers_and_libraries_2019.5.281/linux/mkl/lib/intel64:\$LD_LIBRARY_PATH > /etc/profile.d/mkl.sh'
RUN wget https://github.com/xianyi/OpenBLAS/archive/v0.3.9.tar.gz && \
tar zxvf v0.3.9.tar.gz && cd OpenBLAS-0.3.9 && \
make TARGET=CORE2 DYNAMIC_ARCH=1 DYNAMIC_OLDER=1 USE_THREAD=0 USE_OPENMP=0 FC=gfortran CC=gcc COMMON_OPT="-O3 -g -fPIC" FCOMMON_OPT="-O3 -g -fPIC -frecursive" NMAX="NUM_THREADS=128" LIBPREFIX="libopenblas" LAPACKE="NO_LAPACKE=1" INTERFACE64=0 NO_STATIC=1 && \
make PREFIX=/usr install && \
cd .. && rm -rf OpenBLAS-0.3.9 && rm v0.3.9.tar.gz
RUN apt-get update && apt-get install -y --no-install-recommends ccache && \
apt-get remove --purge -y && \
rm -rf /var/lib/apt/lists/*
COPY docker-entrypoint.sh /app/docker-entrypoint.sh
WORKDIR /root

View File

@ -21,11 +21,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget ca-certifi
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \
wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \
apt-get update && apt-get install -y --no-install-recommends \
g++ git gfortran lsb-core ccache \
g++ git gfortran lsb-core \
libboost-serialization-dev libboost-filesystem-dev libboost-system-dev libboost-regex-dev \
curl libtool automake libssl-dev pkg-config libcurl4-openssl-dev python3-pip \
clang-format-6.0 clang-tidy-6.0 \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 libopenblas-dev liblapack3 && \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 && \
apt-get remove --purge -y && \
rm -rf /var/lib/apt/lists/*
@ -34,6 +34,16 @@ RUN ln -s /usr/lib/x86_64-linux-gnu/libmysqlclient.so \
RUN sh -c 'echo export LD_LIBRARY_PATH=/opt/intel/compilers_and_libraries_2019.5.281/linux/mkl/lib/intel64:\$LD_LIBRARY_PATH > /etc/profile.d/mkl.sh'
RUN wget https://github.com/xianyi/OpenBLAS/archive/v0.3.9.tar.gz && \
tar zxvf v0.3.9.tar.gz && cd OpenBLAS-0.3.9 && \
make TARGET=CORE2 DYNAMIC_ARCH=1 DYNAMIC_OLDER=1 USE_THREAD=0 USE_OPENMP=0 FC=gfortran CC=gcc COMMON_OPT="-O3 -g -fPIC" FCOMMON_OPT="-O3 -g -fPIC -frecursive" NMAX="NUM_THREADS=128" LIBPREFIX="libopenblas" LAPACKE="NO_LAPACKE=1" INTERFACE64=0 NO_STATIC=1 && \
make PREFIX=/usr install && \
cd .. && rm -rf OpenBLAS-0.3.9 && rm v0.3.9.tar.gz
RUN apt-get update && apt-get install -y --no-install-recommends ccache && \
apt-get remove --purge -y && \
rm -rf /var/lib/apt/lists/*
COPY docker-entrypoint.sh /app/docker-entrypoint.sh
WORKDIR /root

Some files were not shown because too many files have changed in this diff Show More