Merge remote-tracking branch 'upstream/0.6.0' into 0.6.0

pull/572/head
Yukikaze-CZR 2019-11-27 14:15:18 +08:00
commit 14acf1814f
56 changed files with 1046 additions and 225 deletions

View File

@ -1,7 +1,7 @@
---
name: "\U0001F41B Bug report"
about: Create a bug report to help us improve Milvus
title: "[BUG]"
title: ''
labels: ''
assignees: ''

View File

@ -1,7 +1,7 @@
---
name: "\U0001F4DD Documentation request"
about: Report incorrect or needed documentation
title: "[DOC]"
title: ''
labels: ''
assignees: ''

View File

@ -1,7 +1,7 @@
---
name: "\U0001F680 Feature request"
about: Suggest an idea for Milvus
title: "[FEATURE]"
title: ''
labels: ''
assignees: ''

View File

@ -1,7 +1,7 @@
---
name: "\U0001F914 General question"
about: Ask a general question about Milvus
title: "[QUESTION]"
title: ''
labels: ''
assignees: ''

3
.gitignore vendored
View File

@ -29,3 +29,6 @@ cmake_build
.coverage
*.pyc
cov_html/
# temp
shards/all_in_one_with_mysql/metadata/

View File

@ -24,7 +24,12 @@ Please mark all change in change log and use the ticket from JIRA.
- \#486 - gpu no usage during index building
- \#509 - IVF_PQ index build trapped into dead loop caused by invalid params
- \#513 - Unittest DELETE_BY_RANGE sometimes failed
- \#523 - Erase file data from cache once the file is marked as deleted
- \#527 - faiss benchmark not compatible with faiss 1.6.0
- \#530 - BuildIndex stop when do build index and search simultaneously
- \#532 - assigin value to `table_name` from confest shell
- \#533 - NSG build failed with MetricType Inner Product
- \#543 - client raise exception in shards when search results is empty
## Feature
- \#12 - Pure CPU version for Milvus
@ -33,6 +38,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#226 - Experimental shards middleware for Milvus
- \#227 - Support new index types SPTAG-KDT and SPTAG-BKT
- \#346 - Support build index with multiple gpu
- \#420 - Update shards merge part to match v0.5.3
- \#488 - Add log in scheduler/optimizer
- \#502 - C++ SDK support IVFPQ and SPTAG

View File

@ -14,6 +14,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: BUILD_ENV_IMAGE_ID
value: "23476391bec80c64f10d44a6370c73c71f011a6b95114b10ff82a60e771e11c7"
command:
- cat
tty: true

View File

@ -14,6 +14,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: BUILD_ENV_IMAGE_ID
value: "da9023b0f858f072672f86483a869aa87e90a5140864f89e5a012ec766d96dea"
command:
- cat
tty: true

View File

@ -1,11 +1,13 @@
timeout(time: 60, unit: 'MINUTES') {
dir ("ci/scripts") {
withCredentials([usernamePassword(credentialsId: "${params.JFROG_CREDENTIALS_ID}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
def checkResult = sh(script: "./check_ccache.sh -l ${params.JFROG_ARTFACTORY_URL}/ccache", returnStatus: true)
if ("${env.BINRARY_VERSION}" == "gpu") {
sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -g -j -u -c"
} else {
sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -m -j -u -c"
}
sh ". ./before-install.sh && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -g -u -c"
} else {
sh ". ./before-install.sh && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -m -u -c"
}
sh "./update_ccache.sh -l ${params.JFROG_ARTFACTORY_URL}/ccache -u ${USERNAME} -p ${PASSWORD}"
}
}
}

11
ci/scripts/before-install.sh Executable file
View File

@ -0,0 +1,11 @@
#!/bin/bash
set -ex
export CCACHE_COMPRESS=1
export CCACHE_COMPRESSLEVEL=5
export CCACHE_COMPILERCHECK=content
export PATH=/usr/lib/ccache/:$PATH
ccache --show-stats
set +ex

73
ci/scripts/check_ccache.sh Executable file
View File

@ -0,0 +1,73 @@
#!/bin/bash
OS_NAME="linux"
CODE_NAME=$(lsb_release -sc)
BUILD_ENV_DOCKER_IMAGE_ID="${BUILD_ENV_IMAGE_ID}"
BRANCH_NAMES=$(git log --decorate | head -n 1 | sed 's/.*(\(.*\))/\1/' | sed 's=[a-zA-Z]*\/==g' | awk -F", " '{$1=""; print $0}')
ARTIFACTORY_URL=""
CCACHE_DIRECTORY="${HOME}/.ccache"
while getopts "l:d:h" arg
do
case $arg in
l)
ARTIFACTORY_URL=$OPTARG
;;
d)
CCACHE_DIRECTORY=$OPTARG
;;
h) # help
echo "
parameter:
-l: artifactory url
-d: ccache directory
-h: help
usage:
./build.sh -l \${ARTIFACTORY_URL} -d \${CCACHE_DIRECTORY} [-h]
"
exit 0
;;
?)
echo "ERROR! unknown argument"
exit 1
;;
esac
done
if [[ -z "${ARTIFACTORY_URL}" || "${ARTIFACTORY_URL}" == "" ]];then
echo "you have not input ARTIFACTORY_URL !"
exit 1
fi
check_ccache() {
BRANCH=$1
echo "fetching ${BRANCH}/ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz"
wget -q --method HEAD "${ARTIFACTORY_URL}/${BRANCH}/ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz"
if [[ $? == 0 ]];then
wget "${ARTIFACTORY_URL}/${BRANCH}/ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz" && \
mkdir -p ${CCACHE_DIRECTORY} && \
tar zxf ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz -C ${CCACHE_DIRECTORY} && \
rm ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz
if [[ $? == 0 ]];then
echo "found cache"
exit 0
fi
fi
}
for BRANCH_NAME in ${BRANCH_NAMES}
do
if [[ "${BRANCH_NAME}" != "HEAD" ]];then
check_ccache ${BRANCH_NAME}
fi
done
if [[ -n "${CHANGE_BRANCH}" && "${BRANCH_NAME}" =~ "PR-" ]];then
check_ccache ${CHANGE_BRANCH}
check_ccache ${BRANCH_NAME}
fi
echo "could not download cache" && exit 1

71
ci/scripts/update_ccache.sh Executable file
View File

@ -0,0 +1,71 @@
#!/bin/bash
OS_NAME="linux"
CODE_NAME=$(lsb_release -sc)
BUILD_ENV_DOCKER_IMAGE_ID="${BUILD_ENV_IMAGE_ID}"
BRANCH_NAME=$(git log --decorate | head -n 1 | sed 's/.*(\(.*\))/\1/' | sed 's/.*, //' | sed 's=[a-zA-Z]*\/==g')
ARTIFACTORY_URL=""
ARTIFACTORY_USER=""
ARTIFACTORY_PASSWORD=""
CCACHE_DIRECTORY="${HOME}/.ccache"
while getopts "l:u:p:d:h" arg
do
case $arg in
l)
ARTIFACTORY_URL=$OPTARG
;;
u)
ARTIFACTORY_USER=$OPTARG
;;
p)
ARTIFACTORY_PASSWORD=$OPTARG
;;
d)
CCACHE_DIRECTORY=$OPTARG
;;
h) # help
echo "
parameter:
-l: artifactory url
-u: artifactory user
-p: artifactory password
-d: ccache directory
-h: help
usage:
./build.sh -l \${ARTIFACTORY_URL} -u \${ARTIFACTORY_USER} -p \${ARTIFACTORY_PASSWORD} -d \${CCACHE_DIRECTORY} [-h]
"
exit 0
;;
?)
echo "ERROR! unknown argument"
exit 1
;;
esac
done
if [[ -z "${ARTIFACTORY_URL}" || "${ARTIFACTORY_URL}" == "" ]];then
echo "you have not input ARTIFACTORY_URL !"
exit 1
fi
PACKAGE_FILE="ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz"
REMOTE_PACKAGE_PATH="${ARTIFACTORY_URL}/${BRANCH_NAME}"
ccache --show-stats
if [[ "${BRANCH_NAME}" != "HEAD" ]];then
echo "Updating ccache package file: ${PACKAGE_FILE}"
tar zcf ./${PACKAGE_FILE} -C ${HOME}/.ccache .
echo "Uploading ccache package file ${PACKAGE_FILE} to ${REMOTE_PACKAGE_PATH}"
curl -u${ARTIFACTORY_USER}:${ARTIFACTORY_PASSWORD} -T ${PACKAGE_FILE} ${REMOTE_PACKAGE_PATH}/${PACKAGE_FILE}
if [[ $? == 0 ]];then
echo "Uploading ccache package file success !"
exit 0
else
echo "Uploading ccache package file fault !"
exit 1
fi
fi

View File

@ -38,7 +38,7 @@ endif ()
set (GIT_BRANCH_NAME_REGEX "[0-9]+\\.[0-9]+\\.[0-9]")
MACRO(GET_GIT_BRANCH_NAME GIT_BRANCH_NAME)
execute_process(COMMAND sh "-c" "git log --decorate | head -n 1 | sed 's/.*(\\(.*\\))/\\1/' | sed 's/.* \\(.*\\),.*/\\1/' | sed 's=[a-zA-Z]*\/==g'"
execute_process(COMMAND sh "-c" "git log --decorate | head -n 1 | sed 's/.*(\\(.*\\))/\\1/' | sed 's/.*, //' | sed 's=[a-zA-Z]*\/==g'"
OUTPUT_VARIABLE ${GIT_BRANCH_NAME})
if(NOT GIT_BRANCH_NAME MATCHES "${GIT_BRANCH_NAME_REGEX}")
execute_process(COMMAND "git" rev-parse --abbrev-ref HEAD OUTPUT_VARIABLE ${GIT_BRANCH_NAME})
@ -187,7 +187,7 @@ endif ()
add_custom_target(Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean)
if ("${MILVUS_DB_PATH}" STREQUAL "")
set(MILVUS_DB_PATH "/tmp/milvus")
set(MILVUS_DB_PATH "${CMAKE_INSTALL_PREFIX}")
endif ()
if (MILVUS_GPU_VERSION)

View File

@ -99,8 +99,8 @@ Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
std::lock_guard<std::mutex> lock(mutex_);
lru_.put(key, item);
SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->Size() << " bytes into cache, usage: " << usage_
<< " bytes";
SERVER_LOG_DEBUG << "Insert " << key << " size: " << item->Size() << " bytes into cache, usage: " << usage_
<< " bytes," << " capacity: " << capacity_ << " bytes";
}
}
@ -115,7 +115,8 @@ Cache<ItemObj>::erase(const std::string& key) {
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->Size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size() << " bytes from cache, usage: " << usage_
<< " bytes," << " capacity: " << capacity_ << " bytes";
lru_.erase(key);
}

View File

@ -112,7 +112,7 @@ DBImpl::Stop() {
bg_timer_thread_.join();
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
meta_ptr_->CleanUp();
meta_ptr_->CleanUpShadowFiles();
}
// ENGINE_LOG_TRACE << "DB service stop";
@ -777,11 +777,18 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
meta_ptr_->Archive();
int ttl = 5 * meta::M_SEC; // default: file will be deleted after 5 minutes
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
ttl = meta::D_SEC;
{
uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds
meta_ptr_->CleanUpCacheWithTTL(ttl);
}
{
uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
ttl = meta::D_SEC;
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
// ENGINE_LOG_TRACE << " Background compaction thread exit";
}

View File

@ -257,6 +257,11 @@ ExecutionEngineImpl::PhysicalSize() const {
Status
ExecutionEngineImpl::Serialize() {
auto status = write_index(index_, location_);
// here we reset index size by file size,
// since some index type(such as SQ8) data size become smaller after serialized
index_->set_size(PhysicalSize());
return status;
}

View File

@ -118,9 +118,13 @@ class Meta {
Archive() = 0;
virtual Status
CleanUp() = 0;
CleanUpShadowFiles() = 0;
virtual Status CleanUpFilesWithTTL(uint16_t) = 0;
virtual Status
CleanUpCacheWithTTL(uint64_t seconds) = 0;
virtual Status
CleanUpFilesWithTTL(uint64_t seconds) = 0;
virtual Status
DropAll() = 0;

View File

@ -20,6 +20,7 @@
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
@ -292,7 +293,7 @@ MySQLMetaImpl::Initialize() {
// step 5: create meta tables
try {
if (mode_ != DBOptions::MODE::CLUSTER_READONLY) {
CleanUp();
CleanUpShadowFiles();
}
{
@ -1710,7 +1711,7 @@ MySQLMetaImpl::Size(uint64_t& result) {
}
Status
MySQLMetaImpl::CleanUp() {
MySQLMetaImpl::CleanUpShadowFiles() {
try {
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
@ -1752,7 +1753,49 @@ MySQLMetaImpl::CleanUp() {
}
Status
MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
// erase deleted/backup files from cache
try {
server::MetricCollector metric;
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(TableFileSchema::TO_DELETE) << ","
<< std::to_string(TableFileSchema::BACKUP) << ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
TableFileSchema table_file;
std::vector<std::string> idsToDelete;
for (auto& resRow : res) {
table_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(table_file.table_id_);
resRow["file_id"].to_string(table_file.file_id_);
table_file.date_ = resRow["date"];
utils::GetTableFilePath(options_, table_file);
server::CommonUtil::EraseFromCache(table_file.location_);
}
} catch (std::exception& e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
}
return Status::OK();
}
Status
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
std::set<std::string> table_ids;

View File

@ -117,10 +117,13 @@ class MySQLMetaImpl : public Meta {
Size(uint64_t& result) override;
Status
CleanUp() override;
CleanUpShadowFiles() override;
Status
CleanUpFilesWithTTL(uint16_t seconds) override;
CleanUpCacheWithTTL(uint64_t seconds) override;
Status
CleanUpFilesWithTTL(uint64_t seconds) override;
Status
DropAll() override;

View File

@ -20,6 +20,7 @@
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
@ -154,7 +155,7 @@ SqliteMetaImpl::Initialize() {
ConnectorPtr->open_forever(); // thread safe option
ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
CleanUp();
CleanUpShadowFiles();
return Status::OK();
}
@ -1231,7 +1232,7 @@ SqliteMetaImpl::Size(uint64_t& result) {
}
Status
SqliteMetaImpl::CleanUp() {
SqliteMetaImpl::CleanUpShadowFiles() {
try {
server::MetricCollector metric;
@ -1269,7 +1270,51 @@ SqliteMetaImpl::CleanUp() {
}
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
// erase deleted/backup files from cache
try {
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
std::vector<int> file_types = {
(int)TableFileSchema::TO_DELETE,
(int)TableFileSchema::BACKUP,
};
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::date_),
where(
in(&TableFileSchema::file_type_, file_types)
and
c(&TableFileSchema::updated_time_)
< now - seconds * US_PS));
for (auto& file : files) {
TableFileSchema table_file;
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file);
utils::GetTableFilePath(options_, table_file);
server::CommonUtil::EraseFromCache(table_file.location_);
}
} catch (std::exception& e) {
return HandleException("Encounter exception when clean cache", e.what());
}
return Status::OK();
}
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
std::set<std::string> table_ids;

View File

@ -117,10 +117,13 @@ class SqliteMetaImpl : public Meta {
Archive() override;
Status
CleanUp() override;
CleanUpShadowFiles() override;
Status
CleanUpFilesWithTTL(uint16_t seconds) override;
CleanUpCacheWithTTL(uint64_t seconds) override;
Status
CleanUpFilesWithTTL(uint64_t seconds) override;
Status
DropAll() override;

View File

@ -1,4 +1,4 @@
We manually change two APIs in "milvus.pd.h":
We manually change two APIs in "milvus.pb.h":
add_vector_data()
add_row_id_array()
add_ids()

View File

@ -38,6 +38,7 @@ set(index_srcs
knowhere/index/vector_index/nsg/NSG.cpp
knowhere/index/vector_index/nsg/NSGIO.cpp
knowhere/index/vector_index/nsg/NSGHelper.cpp
knowhere/index/vector_index/nsg/Distance.cpp
knowhere/index/vector_index/IndexIVFSQ.cpp
knowhere/index/vector_index/IndexIVFPQ.cpp
knowhere/index/vector_index/FaissBaseIndex.cpp

View File

@ -115,10 +115,6 @@ NSG::Train(const DatasetPtr& dataset, const Config& config) {
build_cfg->CheckValid(); // throw exception
}
if (build_cfg->metric_type != METRICTYPE::L2) {
KNOWHERE_THROW_MSG("NSG not support this kind of metric type");
}
// TODO(linxj): dev IndexFactory, support more IndexType
#ifdef MILVUS_GPU_VERSION
auto preprocess_index = std::make_shared<GPUIVF>(build_cfg->gpu_id);

View File

@ -0,0 +1,247 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <immintrin.h>
#include "knowhere/index/vector_index/nsg/Distance.h"
namespace knowhere {
namespace algo {
float
DistanceL2::Compare(const float* a, const float* b, unsigned size) const {
float result = 0;
#ifdef __GNUC__
#ifdef __AVX__
#define AVX_L2SQR(addr1, addr2, dest, tmp1, tmp2) \
tmp1 = _mm256_loadu_ps(addr1); \
tmp2 = _mm256_loadu_ps(addr2); \
tmp1 = _mm256_sub_ps(tmp1, tmp2); \
tmp1 = _mm256_mul_ps(tmp1, tmp1); \
dest = _mm256_add_ps(dest, tmp1);
__m256 sum;
__m256 l0, l1;
__m256 r0, r1;
unsigned D = (size + 7) & ~7U;
unsigned DR = D % 16;
unsigned DD = D - DR;
const float* l = a;
const float* r = b;
const float* e_l = l + DD;
const float* e_r = r + DD;
float unpack[8] __attribute__((aligned(32))) = {0, 0, 0, 0, 0, 0, 0, 0};
sum = _mm256_loadu_ps(unpack);
if (DR) {
AVX_L2SQR(e_l, e_r, sum, l0, r0);
}
for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) {
AVX_L2SQR(l, r, sum, l0, r0);
AVX_L2SQR(l + 8, r + 8, sum, l1, r1);
}
_mm256_storeu_ps(unpack, sum);
result = unpack[0] + unpack[1] + unpack[2] + unpack[3] + unpack[4] + unpack[5] + unpack[6] + unpack[7];
#else
#ifdef __SSE2__
#define SSE_L2SQR(addr1, addr2, dest, tmp1, tmp2) \
tmp1 = _mm_load_ps(addr1); \
tmp2 = _mm_load_ps(addr2); \
tmp1 = _mm_sub_ps(tmp1, tmp2); \
tmp1 = _mm_mul_ps(tmp1, tmp1); \
dest = _mm_add_ps(dest, tmp1);
__m128 sum;
__m128 l0, l1, l2, l3;
__m128 r0, r1, r2, r3;
unsigned D = (size + 3) & ~3U;
unsigned DR = D % 16;
unsigned DD = D - DR;
const float* l = a;
const float* r = b;
const float* e_l = l + DD;
const float* e_r = r + DD;
float unpack[4] __attribute__((aligned(16))) = {0, 0, 0, 0};
sum = _mm_load_ps(unpack);
switch (DR) {
case 12:
SSE_L2SQR(e_l + 8, e_r + 8, sum, l2, r2);
case 8:
SSE_L2SQR(e_l + 4, e_r + 4, sum, l1, r1);
case 4:
SSE_L2SQR(e_l, e_r, sum, l0, r0);
default:
break;
}
for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) {
SSE_L2SQR(l, r, sum, l0, r0);
SSE_L2SQR(l + 4, r + 4, sum, l1, r1);
SSE_L2SQR(l + 8, r + 8, sum, l2, r2);
SSE_L2SQR(l + 12, r + 12, sum, l3, r3);
}
_mm_storeu_ps(unpack, sum);
result += unpack[0] + unpack[1] + unpack[2] + unpack[3];
// nomal distance
#else
float diff0, diff1, diff2, diff3;
const float* last = a + size;
const float* unroll_group = last - 3;
/* Process 4 items with each loop for efficiency. */
while (a < unroll_group) {
diff0 = a[0] - b[0];
diff1 = a[1] - b[1];
diff2 = a[2] - b[2];
diff3 = a[3] - b[3];
result += diff0 * diff0 + diff1 * diff1 + diff2 * diff2 + diff3 * diff3;
a += 4;
b += 4;
}
/* Process last 0-3 pixels. Not needed for standard vector lengths. */
while (a < last) {
diff0 = *a++ - *b++;
result += diff0 * diff0;
}
#endif
#endif
#endif
return result;
}
float
DistanceIP::Compare(const float* a, const float* b, unsigned size) const {
float result = 0;
#ifdef __GNUC__
#ifdef __AVX__
#define AVX_DOT(addr1, addr2, dest, tmp1, tmp2) \
tmp1 = _mm256_loadu_ps(addr1); \
tmp2 = _mm256_loadu_ps(addr2); \
tmp1 = _mm256_mul_ps(tmp1, tmp2); \
dest = _mm256_add_ps(dest, tmp1);
__m256 sum;
__m256 l0, l1;
__m256 r0, r1;
unsigned D = (size + 7) & ~7U;
unsigned DR = D % 16;
unsigned DD = D - DR;
const float* l = a;
const float* r = b;
const float* e_l = l + DD;
const float* e_r = r + DD;
float unpack[8] __attribute__((aligned(32))) = {0, 0, 0, 0, 0, 0, 0, 0};
sum = _mm256_loadu_ps(unpack);
if (DR) {
AVX_DOT(e_l, e_r, sum, l0, r0);
}
for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) {
AVX_DOT(l, r, sum, l0, r0);
AVX_DOT(l + 8, r + 8, sum, l1, r1);
}
_mm256_storeu_ps(unpack, sum);
result = unpack[0] + unpack[1] + unpack[2] + unpack[3] + unpack[4] + unpack[5] + unpack[6] + unpack[7];
#else
#ifdef __SSE2__
#define SSE_DOT(addr1, addr2, dest, tmp1, tmp2) \
tmp1 = _mm128_loadu_ps(addr1); \
tmp2 = _mm128_loadu_ps(addr2); \
tmp1 = _mm128_mul_ps(tmp1, tmp2); \
dest = _mm128_add_ps(dest, tmp1);
__m128 sum;
__m128 l0, l1, l2, l3;
__m128 r0, r1, r2, r3;
unsigned D = (size + 3) & ~3U;
unsigned DR = D % 16;
unsigned DD = D - DR;
const float* l = a;
const float* r = b;
const float* e_l = l + DD;
const float* e_r = r + DD;
float unpack[4] __attribute__((aligned(16))) = {0, 0, 0, 0};
sum = _mm_load_ps(unpack);
switch (DR) {
case 12:
SSE_DOT(e_l + 8, e_r + 8, sum, l2, r2);
case 8:
SSE_DOT(e_l + 4, e_r + 4, sum, l1, r1);
case 4:
SSE_DOT(e_l, e_r, sum, l0, r0);
default:
break;
}
for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) {
SSE_DOT(l, r, sum, l0, r0);
SSE_DOT(l + 4, r + 4, sum, l1, r1);
SSE_DOT(l + 8, r + 8, sum, l2, r2);
SSE_DOT(l + 12, r + 12, sum, l3, r3);
}
_mm_storeu_ps(unpack, sum);
result += unpack[0] + unpack[1] + unpack[2] + unpack[3];
#else
float dot0, dot1, dot2, dot3;
const float* last = a + size;
const float* unroll_group = last - 3;
/* Process 4 items with each loop for efficiency. */
while (a < unroll_group) {
dot0 = a[0] * b[0];
dot1 = a[1] * b[1];
dot2 = a[2] * b[2];
dot3 = a[3] * b[3];
result += dot0 + dot1 + dot2 + dot3;
a += 4;
b += 4;
}
/* Process last 0-3 pixels. Not needed for standard vector lengths. */
while (a < last) {
result += *a++ * *b++;
}
#endif
#endif
#endif
return result;
}
//#include <faiss/utils/distances.h>
// float
// DistanceL2::Compare(const float* a, const float* b, unsigned size) const {
// return faiss::fvec_L2sqr(a,b,size);
//}
//
// float
// DistanceIP::Compare(const float* a, const float* b, unsigned size) const {
// return faiss::fvec_inner_product(a,b,size);
//}
} // namespace algo
} // namespace knowhere

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
namespace knowhere {
namespace algo {
struct Distance {
virtual float
Compare(const float* a, const float* b, unsigned size) const = 0;
};
struct DistanceL2 : public Distance {
float
Compare(const float* a, const float* b, unsigned size) const override;
};
struct DistanceIP : public Distance {
float
Compare(const float* a, const float* b, unsigned size) const override;
};
} // namespace algo
} // namespace knowhere

View File

@ -35,17 +35,24 @@
namespace knowhere {
namespace algo {
NsgIndex::NsgIndex(const size_t& dimension, const size_t& n, MetricType metric)
NsgIndex::NsgIndex(const size_t& dimension, const size_t& n, METRICTYPE metric)
: dimension(dimension), ntotal(n), metric_type(metric) {
switch (metric) {
case METRICTYPE::L2:
distance_ = new DistanceL2;
break;
case METRICTYPE::IP:
distance_ = new DistanceIP;
break;
}
}
NsgIndex::~NsgIndex() {
delete[] ori_data_;
delete[] ids_;
delete distance_;
}
// void NsgIndex::Build(size_t nb, const float *data, const BuildParam &parameters) {
//}
void
NsgIndex::Build_with_ids(size_t nb, const float* data, const int64_t* ids, const BuildParams& parameters) {
TimeRecorder rc("NSG");
@ -126,7 +133,7 @@ NsgIndex::InitNavigationPoint() {
//>> Debug code
/////
// float r1 = calculate(center, ori_data_ + navigation_point * dimension, dimension);
// float r1 = distance_->Compare(center, ori_data_ + navigation_point * dimension, dimension);
// assert(r1 == resset[0].distance);
/////
}
@ -180,7 +187,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector<Neighbor>& resset, std::v
continue;
}
float dist = calculate(ori_data_ + dimension * id, query, dimension);
float dist = distance_->Compare(ori_data_ + dimension * id, query, dimension);
resset[i] = Neighbor(id, dist, false);
///////////// difference from other GetNeighbors ///////////////
@ -205,7 +212,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector<Neighbor>& resset, std::v
continue;
has_calculated_dist[id] = true;
float dist = calculate(query, ori_data_ + dimension * id, dimension);
float dist = distance_->Compare(query, ori_data_ + dimension * id, dimension);
Neighbor nn(id, dist, false);
fullset.push_back(nn);
@ -278,7 +285,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector<Neighbor>& resset, std::v
continue;
}
float dist = calculate(ori_data_ + id * dimension, query, dimension);
float dist = distance_->Compare(ori_data_ + id * dimension, query, dimension);
resset[i] = Neighbor(id, dist, false);
}
std::sort(resset.begin(), resset.end()); // sort by distance
@ -299,7 +306,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector<Neighbor>& resset, std::v
continue;
has_calculated_dist[id] = true;
float dist = calculate(ori_data_ + dimension * id, query, dimension);
float dist = distance_->Compare(ori_data_ + dimension * id, query, dimension);
Neighbor nn(id, dist, false);
fullset.push_back(nn);
@ -371,7 +378,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector<Neighbor>& resset, Graph&
continue;
}
float dist = calculate(ori_data_ + id * dimension, query, dimension);
float dist = distance_->Compare(ori_data_ + id * dimension, query, dimension);
resset[i] = Neighbor(id, dist, false);
}
std::sort(resset.begin(), resset.end()); // sort by distance
@ -399,7 +406,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector<Neighbor>& resset, Graph&
continue;
has_calculated_dist[id] = true;
float dist = calculate(query, ori_data_ + dimension * id, dimension);
float dist = distance_->Compare(query, ori_data_ + dimension * id, dimension);
if (dist >= resset[buffer_size - 1].distance)
continue;
@ -449,7 +456,7 @@ NsgIndex::Link() {
//>> Debug code
/////
// float r1 = calculate(ori_data_ + n * dimension, ori_data_ + temp[0].id * dimension, dimension);
// float r1 = distance_->Compare(ori_data_ + n * dimension, ori_data_ + temp[0].id * dimension, dimension);
// assert(r1 == temp[0].distance);
/////
SyncPrune(n, fullset, flags, cut_graph_dist);
@ -496,7 +503,7 @@ NsgIndex::SyncPrune(size_t n, std::vector<Neighbor>& pool, boost::dynamic_bitset
auto id = knng[n][i];
if (has_calculated[id])
continue;
float dist = calculate(ori_data_ + dimension * n, ori_data_ + dimension * id, dimension);
float dist = distance_->Compare(ori_data_ + dimension * n, ori_data_ + dimension * id, dimension);
pool.emplace_back(Neighbor(id, dist, true));
}
@ -613,7 +620,8 @@ NsgIndex::SelectEdge(unsigned& cursor, std::vector<Neighbor>& sort_pool, std::ve
auto& p = pool[cursor];
bool should_link = true;
for (size_t t = 0; t < result.size(); ++t) {
float dist = calculate(ori_data_ + dimension * result[t].id, ori_data_ + dimension * p.id, dimension);
float dist =
distance_->Compare(ori_data_ + dimension * result[t].id, ori_data_ + dimension * p.id, dimension);
if (dist < p.distance) {
should_link = false;

View File

@ -22,18 +22,16 @@
#include <vector>
#include <boost/dynamic_bitset.hpp>
#include "Distance.h"
#include "Neighbor.h"
#include "knowhere/common/Config.h"
namespace knowhere {
namespace algo {
using node_t = int64_t;
enum class MetricType {
METRIC_INNER_PRODUCT = 0,
METRIC_L2 = 1,
};
struct BuildParams {
size_t search_length;
size_t out_degree;
@ -50,7 +48,8 @@ class NsgIndex {
public:
size_t dimension;
size_t ntotal; // totabl nb of indexed vectors
MetricType metric_type; // L2 | IP
METRICTYPE metric_type; // L2 | IP
Distance* distance_;
float* ori_data_;
int64_t* ids_; // TODO: support different type
@ -69,7 +68,7 @@ class NsgIndex {
size_t out_degree;
public:
explicit NsgIndex(const size_t& dimension, const size_t& n, MetricType metric = MetricType::METRIC_L2);
explicit NsgIndex(const size_t& dimension, const size_t& n, METRICTYPE metric = METRICTYPE::L2);
NsgIndex() = default;

View File

@ -16,7 +16,6 @@
// under the License.
#include <cstring>
#include <fstream>
#include "knowhere/index/vector_index/nsg/NSGHelper.h"
@ -27,9 +26,9 @@ namespace algo {
int
InsertIntoPool(Neighbor* addr, unsigned K, Neighbor nn) {
//>> Fix: Add assert
for (unsigned int i = 0; i < K; ++i) {
assert(addr[i].id != nn.id);
}
// for (unsigned int i = 0; i < K; ++i) {
// assert(addr[i].id != nn.id);
// }
// find the location to insert
int left = 0, right = K - 1;
@ -68,114 +67,5 @@ InsertIntoPool(Neighbor* addr, unsigned K, Neighbor nn) {
return right;
}
// TODO: support L2 / IP
float
calculate(const float* a, const float* b, unsigned size) {
float result = 0;
#ifdef __GNUC__
#ifdef __AVX__
#define AVX_L2SQR(addr1, addr2, dest, tmp1, tmp2) \
tmp1 = _mm256_loadu_ps(addr1); \
tmp2 = _mm256_loadu_ps(addr2); \
tmp1 = _mm256_sub_ps(tmp1, tmp2); \
tmp1 = _mm256_mul_ps(tmp1, tmp1); \
dest = _mm256_add_ps(dest, tmp1);
__m256 sum;
__m256 l0, l1;
__m256 r0, r1;
unsigned D = (size + 7) & ~7U;
unsigned DR = D % 16;
unsigned DD = D - DR;
const float* l = a;
const float* r = b;
const float* e_l = l + DD;
const float* e_r = r + DD;
float unpack[8] __attribute__((aligned(32))) = {0, 0, 0, 0, 0, 0, 0, 0};
sum = _mm256_loadu_ps(unpack);
if (DR) {
AVX_L2SQR(e_l, e_r, sum, l0, r0);
}
for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) {
AVX_L2SQR(l, r, sum, l0, r0);
AVX_L2SQR(l + 8, r + 8, sum, l1, r1);
}
_mm256_storeu_ps(unpack, sum);
result = unpack[0] + unpack[1] + unpack[2] + unpack[3] + unpack[4] + unpack[5] + unpack[6] + unpack[7];
#else
#ifdef __SSE2__
#define SSE_L2SQR(addr1, addr2, dest, tmp1, tmp2) \
tmp1 = _mm_load_ps(addr1); \
tmp2 = _mm_load_ps(addr2); \
tmp1 = _mm_sub_ps(tmp1, tmp2); \
tmp1 = _mm_mul_ps(tmp1, tmp1); \
dest = _mm_add_ps(dest, tmp1);
__m128 sum;
__m128 l0, l1, l2, l3;
__m128 r0, r1, r2, r3;
unsigned D = (size + 3) & ~3U;
unsigned DR = D % 16;
unsigned DD = D - DR;
const float* l = a;
const float* r = b;
const float* e_l = l + DD;
const float* e_r = r + DD;
float unpack[4] __attribute__((aligned(16))) = {0, 0, 0, 0};
sum = _mm_load_ps(unpack);
switch (DR) {
case 12:
SSE_L2SQR(e_l + 8, e_r + 8, sum, l2, r2);
case 8:
SSE_L2SQR(e_l + 4, e_r + 4, sum, l1, r1);
case 4:
SSE_L2SQR(e_l, e_r, sum, l0, r0);
default:
break;
}
for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) {
SSE_L2SQR(l, r, sum, l0, r0);
SSE_L2SQR(l + 4, r + 4, sum, l1, r1);
SSE_L2SQR(l + 8, r + 8, sum, l2, r2);
SSE_L2SQR(l + 12, r + 12, sum, l3, r3);
}
_mm_storeu_ps(unpack, sum);
result += unpack[0] + unpack[1] + unpack[2] + unpack[3];
// nomal distance
#else
float diff0, diff1, diff2, diff3;
const float* last = a + size;
const float* unroll_group = last - 3;
/* Process 4 items with each loop for efficiency. */
while (a < unroll_group) {
diff0 = a[0] - b[0];
diff1 = a[1] - b[1];
diff2 = a[2] - b[2];
diff3 = a[3] - b[3];
result += diff0 * diff0 + diff1 * diff1 + diff2 * diff2 + diff3 * diff3;
a += 4;
b += 4;
}
/* Process last 0-3 pixels. Not needed for standard vector lengths. */
while (a < last) {
diff0 = *a++ - *b++;
result += diff0 * diff0;
}
#endif
#endif
#endif
return result;
}
} // namespace algo
}; // namespace algo
} // namespace knowhere

View File

@ -17,21 +17,13 @@
#pragma once
#include <x86intrin.h>
#include <iostream>
#include <faiss/AutoTune.h>
#include "NSG.h"
#include "knowhere/common/Config.h"
#include "Neighbor.h"
namespace knowhere {
namespace algo {
extern int
InsertIntoPool(Neighbor* addr, unsigned K, Neighbor nn);
extern float
calculate(const float* a, const float* b, unsigned size);
} // namespace algo
} // namespace knowhere

View File

@ -18,7 +18,6 @@
#pragma once
#include "NSG.h"
#include "knowhere/index/vector_index/IndexIVF.h"
#include "knowhere/index/vector_index/helpers/FaissIO.h"
namespace knowhere {
@ -26,6 +25,7 @@ namespace algo {
extern void
write_index(NsgIndex* index, MemoryIOWriter& writer);
extern NsgIndex*
read_index(MemoryIOReader& reader);

View File

@ -24,6 +24,8 @@
#ifdef MILVUS_GPU_VERSION
#include "knowhere/index/vector_index/helpers/FaissGpuResourceMgr.h"
#endif
#include "knowhere/common/Timer.h"
#include "knowhere/index/vector_index/nsg/NSGIO.h"
#include "unittest/utils.h"
@ -95,20 +97,19 @@ TEST_F(NSGInterfaceTest, basic_test) {
index_->Add(base_dataset, knowhere::Config());
index_->Seal();
});
{
// std::cout << "k = 1" << std::endl;
// new_index->Search(GenQuery(1), Config::object{{"k", 1}});
// new_index->Search(GenQuery(10), Config::object{{"k", 1}});
// new_index->Search(GenQuery(100), Config::object{{"k", 1}});
// new_index->Search(GenQuery(1000), Config::object{{"k", 1}});
// new_index->Search(GenQuery(10000), Config::object{{"k", 1}});
// std::cout << "k = 5" << std::endl;
// new_index->Search(GenQuery(1), Config::object{{"k", 5}});
// new_index->Search(GenQuery(20), Config::object{{"k", 5}});
// new_index->Search(GenQuery(100), Config::object{{"k", 5}});
// new_index->Search(GenQuery(300), Config::object{{"k", 5}});
// new_index->Search(GenQuery(500), Config::object{{"k", 5}});
}
}
TEST_F(NSGInterfaceTest, comparetest) {
knowhere::algo::DistanceL2 distanceL2;
knowhere::algo::DistanceIP distanceIP;
knowhere::TimeRecorder tc("Compare");
for (int i = 0; i < 1000; ++i) {
distanceL2.Compare(xb.data(), xq.data(), 256);
}
tc.RecordSection("L2");
for (int i = 0; i < 1000; ++i) {
distanceIP.Compare(xb.data(), xq.data(), 256);
}
tc.RecordSection("IP");
}

View File

@ -55,6 +55,11 @@ class BuildMgr {
}
}
int64_t
NumOfAvailable() {
return available_;
}
private:
std::int64_t available_;
std::mutex mutex_;

View File

@ -178,7 +178,8 @@ TaskTable::PickToLoad(uint64_t limit) {
// if task is a build index task, limit it
if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
if (not BuildMgrInst::GetInstance()->Take()) {
if (BuildMgrInst::GetInstance()->NumOfAvailable() < 1) {
SERVER_LOG_WARNING << "BuildMgr doesnot have available place for building index";
continue;
}
}

View File

@ -178,6 +178,10 @@ Resource::loader_function() {
if (task_item == nullptr) {
break;
}
if (task_item->task->Type() == TaskType::BuildIndexTask && name() == "cpu") {
BuildMgrInst::GetInstance()->Take();
SERVER_LOG_DEBUG << name() << " load BuildIndexTask";
}
LoadFile(task_item->task);
task_item->Loaded();
if (task_item->from) {
@ -208,7 +212,6 @@ Resource::executor_function() {
if (task_item == nullptr) {
break;
}
auto start = get_current_timestamp();
Process(task_item->task);
auto finish = get_current_timestamp();

View File

@ -16,6 +16,9 @@
// under the License.
#include "utils/CommonUtil.h"
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "server/Config.h"
#include "utils/Log.h"
#include <dirent.h>
@ -27,6 +30,7 @@
#include <unistd.h>
#include <iostream>
#include <thread>
#include <vector>
#include "boost/filesystem.hpp"
@ -222,5 +226,24 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) {
time_integer = mktime(&time_struct);
}
void
CommonUtil::EraseFromCache(const std::string& item_key) {
if (item_key.empty()) {
SERVER_LOG_ERROR << "Empty key cannot be erased from cache";
return;
}
cache::CpuCacheMgr::GetInstance()->EraseItem(item_key);
#ifdef MILVUS_GPU_VERSION
server::Config& config = server::Config::GetInstance();
std::vector<int64_t> gpus;
Status s = config.GetGpuResourceConfigSearchResources(gpus);
for (auto& gpu : gpus) {
cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key);
}
#endif
}
} // namespace server
} // namespace milvus

View File

@ -56,6 +56,9 @@ class CommonUtil {
ConvertTime(time_t time_integer, tm& time_struct);
static void
ConvertTime(tm time_struct, time_t& time_integer);
static void
EraseFromCache(const std::string& item_key);
};
} // namespace server

View File

@ -329,7 +329,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = milvus::engine::meta::TableFileSchema::NEW;
status = impl_->UpdateTableFile(table_file);
status = impl_->CleanUp();
status = impl_->CleanUpShadowFiles();
ASSERT_TRUE(status.ok());
status = impl_->DropTable(table_id);

View File

@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget ca-certifi
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \
wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \
apt-get update && apt-get install -y --no-install-recommends \
g++ git gfortran lsb-core \
g++ git gfortran lsb-core ccache \
libboost-serialization-dev libboost-filesystem-dev libboost-system-dev libboost-regex-dev \
curl libtool automake libssl-dev pkg-config libcurl4-openssl-dev python3-pip \
clang-format-6.0 clang-tidy-6.0 \

View File

@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget ca-certifi
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \
wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \
apt-get update && apt-get install -y --no-install-recommends \
g++ git gfortran lsb-core \
g++ git gfortran lsb-core ccache \
libboost-serialization-dev libboost-filesystem-dev libboost-system-dev libboost-regex-dev \
curl libtool automake libssl-dev pkg-config libcurl4-openssl-dev python3-pip \
clang-format-6.0 clang-tidy-6.0 \

View File

@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget && \
apt-key add /tmp/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB && \
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \
apt-get update && apt-get install -y --no-install-recommends \
git flex bison gfortran lsb-core \
git flex bison gfortran lsb-core ccache \
curl libtool automake libboost1.58-all-dev libssl-dev pkg-config libcurl4-openssl-dev python3-pip \
clang-format-6.0 clang-tidy-6.0 \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 && \

View File

@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget && \
apt-key add /tmp/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB && \
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \
apt-get update && apt-get install -y --no-install-recommends \
git flex bison gfortran lsb-core \
git flex bison gfortran lsb-core ccache \
curl libtool automake libboost-all-dev libssl-dev pkg-config libcurl4-openssl-dev python3-pip \
clang-format-6.0 clang-tidy-6.0 \
lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 && \

View File

@ -11,3 +11,4 @@ __pycache__
*.md
*.yml
*.yaml
*/metadata/

View File

@ -13,6 +13,12 @@ clean_deploy:
cd all_in_one && docker-compose -f all_in_one.yml down && cd -
probe_deploy:
docker run --rm --name probe --net=host milvusdb/mishards /bin/bash -c "python all_in_one/probe_test.py"
deploy_m: clean_deploy_m
cd all_in_one_with_mysql && docker-compose -f all_in_one.yml up -d && cd -
clean_deploy_m:
cd all_in_one_with_mysql && docker-compose -f all_in_one.yml down && cd -
probe_deploy_m:
docker run --rm --name probe --net=host milvusdb/mishards /bin/bash -c "python all_in_one_with_mysql/probe_test.py"
cluster:
cd kubernetes_demo;./start.sh baseup;sleep 10;./start.sh appup;cd -
clean_cluster:
@ -26,7 +32,7 @@ probe:
docker run --rm --name probe --net=host milvusdb/mishards /bin/bash -c "python all_in_one/probe_test.py --port=${PORT} --host=${HOST}"
clean_coverage:
rm -rf cov_html
clean: clean_coverage clean_deploy clean_cluster
clean: clean_coverage clean_deploy clean_cluster clean_deploy_m
style:
pycodestyle --config=.
coverage:

View File

@ -0,0 +1,77 @@
version: "2.3"
services:
milvus-mysql:
restart: always
image: mysql:5.7
volumes:
- ./mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
- ./metadata:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: 'milvusroot'
MYSQL_DATABASE: 'milvus'
healthcheck:
test: ["CMD", "sleep", "5"]
interval: 1s
timeout: 10s
retries: 2
milvus_wr:
runtime: nvidia
restart: always
image: milvusdb/milvus
volumes:
- /tmp/milvus/db:/opt/milvus/db
- ./wr_server.yml:/opt/milvus/conf/server_config.yaml
depends_on:
milvus-mysql:
condition: service_healthy
milvus_ro:
runtime: nvidia
restart: always
image: milvusdb/milvus
volumes:
- /tmp/milvus/db:/opt/milvus/db
- ./ro_server.yml:/opt/milvus/conf/server_config.yaml
depends_on:
- milvus-mysql
- milvus_wr
jaeger:
restart: always
image: jaegertracing/all-in-one:1.14
ports:
- "0.0.0.0:5775:5775/udp"
- "0.0.0.0:16686:16686"
- "0.0.0.0:9441:9441"
environment:
COLLECTOR_ZIPKIN_HTTP_PORT: 9411
mishards:
restart: always
image: milvusdb/mishards
ports:
- "0.0.0.0:19531:19531"
- "0.0.0.0:19532:19532"
volumes:
- /tmp/milvus/db:/tmp/milvus/db
# - /tmp/mishards_env:/source/mishards/.env
command: ["python", "mishards/main.py"]
environment:
FROM_EXAMPLE: 'true'
SQLALCHEMY_DATABASE_URI: mysql+pymysql://root:milvusroot@milvus-mysql:3306/milvus?charset=utf8mb4
DEBUG: 'true'
SERVER_PORT: 19531
WOSERVER: tcp://milvus_wr:19530
DISCOVERY_PLUGIN_PATH: static
DISCOVERY_STATIC_HOSTS: milvus_wr,milvus_ro
TRACER_CLASS_NAME: jaeger
TRACING_SERVICE_NAME: mishards-demo
TRACING_REPORTING_HOST: jaeger
TRACING_REPORTING_PORT: 5775
depends_on:
- milvus_wr
- milvus_ro
- milvus-mysql
- jaeger

View File

@ -0,0 +1,28 @@
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
log-error = /var/log/mysql/error.log
bind-address = 0.0.0.0
symbolic-links=0
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
init_connect='SET NAMES utf8mb4'
skip-character-set-client-handshake = true
max_connections = 1000
wait_timeout = 31536000
table_open_cache = 128
external-locking = FALSE
binlog_cache_size = 1M
max_heap_table_size = 8M
tmp_table_size = 16M
read_rnd_buffer_size = 8M
sort_buffer_size = 8M
join_buffer_size = 8M
thread_cache_size = 32
query_cache_size = 64M
innodb_buffer_pool_size = 64M
innodb_flush_log_at_trx_commit = 0
innodb_log_buffer_size = 2M
max_allowed_packet=64M
explicit_defaults_for_timestamp=true

View File

@ -0,0 +1,25 @@
from milvus import Milvus
RED = '\033[0;31m'
GREEN = '\033[0;32m'
ENDC = ''
def test(host='127.0.0.1', port=19531):
client = Milvus()
try:
status = client.connect(host=host, port=port)
if status.OK():
print('{}Pass: Connected{}'.format(GREEN, ENDC))
return 0
else:
print('{}Error: {}{}'.format(RED, status, ENDC))
return 1
except Exception as exc:
print('{}Error: {}{}'.format(RED, exc, ENDC))
return 1
if __name__ == '__main__':
import fire
fire.Fire(test)

View File

@ -0,0 +1,42 @@
server_config:
address: 0.0.0.0 # milvus server ip address (IPv4)
port: 19530 # port range: 1025 ~ 65534
deploy_mode: cluster_readonly # deployment type: single, cluster_readonly, cluster_writable
time_zone: UTC+8
db_config:
primary_path: /opt/milvus # path used to store data and meta
secondary_path: # path used to store data only, split by semicolon
backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus
# URI format: dialect://username:password@host:port/database
# Keep 'dialect://:@:/', and replace other texts with real values
# Replace 'dialect' with 'mysql' or 'sqlite'
insert_buffer_size: 1 # GB, maximum insert buffer size allowed
# sum of insert_buffer_size and cpu_cache_capacity cannot exceed total memory
preload_table: # preload data at startup, '*' means load all tables, empty value means no preload
# you can specify preload tables like this: table1,table2,table3
metric_config:
enable_monitor: false # enable monitoring or not
collector: prometheus # prometheus
prometheus_config:
port: 8080 # port prometheus uses to fetch metrics
cache_config:
cpu_cache_capacity: 4 # GB, CPU memory used for cache
cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered
gpu_cache_capacity: 1 # GB, GPU memory used for cache
gpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered
cache_insert_data: false # whether to load inserted data into cache
engine_config:
use_blas_threshold: 800 # if nq < use_blas_threshold, use SSE, faster with fluctuated response times
# if nq >= use_blas_threshold, use OpenBlas, slower with stable response times
resource_config:
search_resources: # define the GPUs used for search computation, valid value: gpux
- gpu0
index_build_device: gpu0 # GPU used for building index

View File

@ -0,0 +1,41 @@
server_config:
address: 0.0.0.0 # milvus server ip address (IPv4)
port: 19530 # port range: 1025 ~ 65534
deploy_mode: cluster_writable # deployment type: single, cluster_readonly, cluster_writable
time_zone: UTC+8
db_config:
primary_path: /opt/milvus # path used to store data and meta
secondary_path: # path used to store data only, split by semicolon
backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus # URI format: dialect://username:password@host:port/database
# Keep 'dialect://:@:/', and replace other texts with real values
# Replace 'dialect' with 'mysql' or 'sqlite'
insert_buffer_size: 2 # GB, maximum insert buffer size allowed
# sum of insert_buffer_size and cpu_cache_capacity cannot exceed total memory
preload_table: # preload data at startup, '*' means load all tables, empty value means no preload
# you can specify preload tables like this: table1,table2,table3
metric_config:
enable_monitor: false # enable monitoring or not
collector: prometheus # prometheus
prometheus_config:
port: 8080 # port prometheus uses to fetch metrics
cache_config:
cpu_cache_capacity: 2 # GB, CPU memory used for cache
cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered
gpu_cache_capacity: 2 # GB, GPU memory used for cache
gpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered
cache_insert_data: false # whether to load inserted data into cache
engine_config:
use_blas_threshold: 800 # if nq < use_blas_threshold, use SSE, faster with fluctuated response times
# if nq >= use_blas_threshold, use OpenBlas, slower with stable response times
resource_config:
search_resources: # define the GPUs used for search computation, valid value: gpux
- gpu0
index_build_device: gpu0 # GPU used for building index

View File

@ -28,7 +28,12 @@ class DB:
if url.get_backend_name() == 'sqlite':
self.engine = create_engine(url)
else:
self.engine = create_engine(uri, pool_size, pool_recycle, pool_timeout, pool_pre_ping, echo, max_overflow)
self.engine = create_engine(uri, pool_size=pool_size,
pool_recycle=pool_recycle,
pool_timeout=pool_timeout,
pool_pre_ping=pool_pre_ping,
echo=echo,
max_overflow=max_overflow)
self.uri = uri
self.url = url

View File

@ -24,8 +24,11 @@ def resp_handler(err, error_code):
if resp_class == milvus_pb2.VectorIds:
return resp_class(status=status, vector_id_array=[])
if resp_class == milvus_pb2.TopKQueryResultList:
return resp_class(status=status, topk_query_result=[])
if resp_class == milvus_pb2.TopKQueryResult:
return resp_class(status=status,
row_num=0,
ids=[],
distances=[])
if resp_class == milvus_pb2.TableRowCount:
return resp_class(status=status, table_row_count=-1)

View File

@ -49,7 +49,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
status = status_pb2.Status(error_code=status_pb2.SUCCESS,
reason="Success")
if not files_n_topk_results:
return status, []
return status, [], []
merge_id_results = []
merge_dis_results = []
@ -58,9 +58,13 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
for files_collection in files_n_topk_results:
if isinstance(files_collection, tuple):
status, _ = files_collection
return status, []
return status, [], []
row_num = files_collection.row_num
# row_num is equal to 0, result is empty
if not row_num:
continue
ids = files_collection.ids
diss = files_collection.distances # distance collections
# TODO: batch_len is equal to topk, may need to compare with topk
@ -322,7 +326,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
topk_result_list = milvus_pb2.TopKQueryResult(
status=status_pb2.Status(error_code=status.error_code,
reason=status.reason),
row_num=len(query_record_array),
row_num=len(request.query_record_array) if len(id_results) else 0,
ids=id_results,
distances=dis_results)
return topk_result_list

72
shards/utils/colors.py Normal file
View File

@ -0,0 +1,72 @@
# Reset
Color_Off='\033[0m' # Text Reset
# Regular Colors
Black='\033[0;30m' # Black
Red='\033[0;31m' # Red
Green='\033[0;32m' # Green
Yellow='\033[0;33m' # Yellow
Blue='\033[0;34m' # Blue
Purple='\033[0;35m' # Purple
Cyan='\033[0;36m' # Cyan
White='\033[0;37m' # White
# Bold
BBlack='\033[1;30m' # Black
BRed='\033[1;31m' # Red
BGreen='\033[1;32m' # Green
BYellow='\033[1;33m' # Yellow
BBlue='\033[1;34m' # Blue
BPurple='\033[1;35m' # Purple
BCyan='\033[1;36m' # Cyan
BWhite='\033[1;37m' # White
# Underline
UBlack='\033[4;30m' # Black
URed='\033[4;31m' # Red
UGreen='\033[4;32m' # Green
UYellow='\033[4;33m' # Yellow
UBlue='\033[4;34m' # Blue
UPurple='\033[4;35m' # Purple
UCyan='\033[4;36m' # Cyan
UWhite='\033[4;37m' # White
# Background
On_Black='\033[40m' # Black
On_Red='\033[41m' # Red
On_Green='\033[42m' # Green
On_Yellow='\033[43m' # Yellow
On_Blue='\033[44m' # Blue
On_Purple='\033[45m' # Purple
On_Cyan='\033[46m' # Cyan
On_White='\033[47m' # White
# High Intensity
IBlack='\033[0;90m' # Black
IRed='\033[0;91m' # Red
IGreen='\033[0;92m' # Green
IYellow='\033[0;93m' # Yellow
IBlue='\033[0;94m' # Blue
IPurple='\033[0;95m' # Purple
ICyan='\033[0;96m' # Cyan
IWhite='\033[0;97m' # White
# Bold High Intensity
BIBlack='\033[1;90m' # Black
BIRed='\033[1;91m' # Red
BIGreen='\033[1;92m' # Green
BIYellow='\033[1;93m' # Yellow
BIBlue='\033[1;94m' # Blue
BIPurple='\033[1;95m' # Purple
BICyan='\033[1;96m' # Cyan
BIWhite='\033[1;97m' # White
# High Intensity backgrounds
On_IBlack='\033[0;100m' # Black
On_IRed='\033[0;101m' # Red
On_IGreen='\033[0;102m' # Green
On_IYellow='\033[0;103m' # Yellow
On_IBlue='\033[0;104m' # Blue
On_IPurple='\033[0;105m' # Purple
On_ICyan='\033[0;106m' # Cyan
On_IWhite='\033[0;107m' # White

View File

@ -3,6 +3,7 @@ import datetime
from pytz import timezone
from logging import Filter
import logging.config
from utils import colors
class InfoFilter(logging.Filter):
@ -31,29 +32,53 @@ class CriticalFilter(logging.Filter):
COLORS = {
'HEADER': '\033[95m',
'INFO': '\033[92m',
'DEBUG': '\033[94m',
'WARNING': '\033[93m',
'ERROR': '\033[95m',
'CRITICAL': '\033[91m',
'ENDC': '\033[0m',
'HEADER': colors.BWhite,
'INFO': colors.On_IWhite + colors.BBlack,
'INFOM': colors.White,
'DEBUG': colors.On_IBlue + colors.BWhite,
'DEBUGM': colors.BIBlue,
'WARNING': colors.On_IYellow + colors.BWhite,
'WARNINGM': colors.BIYellow,
'ERROR': colors.On_IRed + colors.BWhite,
'ERRORM': colors.BIRed,
'CRITICAL': colors.On_Red + colors.BWhite,
'CRITICALM': colors.BRed,
'ASCTIME': colors.On_Cyan + colors.BIYellow,
'MESSAGE': colors.IGreen,
'FILENAME': colors.BCyan,
'LINENO': colors.BCyan,
'THREAD': colors.BCyan,
'ENDC': colors.Color_Off,
}
class ColorFulFormatColMixin:
def format_col(self, message_str, level_name):
if level_name in COLORS.keys():
message_str = COLORS.get(level_name) + message_str + COLORS.get(
'ENDC')
message_str = COLORS[level_name] + message_str + COLORS['ENDC']
return message_str
def formatTime(self, record, datefmt=None):
ret = super().formatTime(record, datefmt)
ret = COLORS['ASCTIME'] + ret + COLORS['ENDC']
return ret
class ColorfulFormatter(logging.Formatter, ColorFulFormatColMixin):
def format_record(self, record):
msg_schema = record.levelname + 'M'
record.msg = '{}{}{}'.format(COLORS[msg_schema], record.msg, COLORS['ENDC'])
record.filename = COLORS['FILENAME'] + record.filename + COLORS['ENDC']
record.lineno = '{}{}{}'.format(COLORS['LINENO'], record.lineno, COLORS['ENDC'])
record.threadName = '{}{}{}'.format(COLORS['THREAD'], record.threadName, COLORS['ENDC'])
record.levelname = COLORS[record.levelname] + record.levelname + COLORS['ENDC']
return record
class ColorfulFormatter(ColorFulFormatColMixin, logging.Formatter):
def format(self, record):
record = self.format_record(record)
message_str = super(ColorfulFormatter, self).format(record)
return self.format_col(message_str, level_name=record.levelname)
return message_str
def config(log_level, log_path, name, tz='UTC'):
@ -76,7 +101,9 @@ def config(log_level, log_path, name, tz='UTC'):
'format': '%(asctime)s | %(levelname)s | %(name)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)',
},
'colorful_console': {
'format': '%(asctime)s | %(levelname)s | %(name)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)',
'format': '%(asctime)s | %(levelname)s: %(message)s (%(filename)s:%(lineno)s) (%(threadName)s)',
# 'format': '%(asctime)s | %(levelname)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)',
# 'format': '%(asctime)s | %(levelname)s | %(name)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)',
'()': ColorfulFormatter,
},
},

View File

@ -1300,7 +1300,8 @@ class TestNameInvalid(object):
assert not status.OK()
@pytest.mark.level(2)
def test_add_vectors_with_invalid_tag_name(self, connect, get_tag_name):
def test_add_vectors_with_invalid_tag_name(self, connect, get_table_name, get_tag_name):
table_name = get_table_name
tag_name = get_tag_name
vectors = gen_vectors(1, dim)
status, result = connect.add_vectors(table_name, vectors, partition_tag=tag_name)

View File

@ -69,7 +69,7 @@ def gen_invalid_ips():
"\n",
"\t",
"中文",
"a".join("a" for i in range(256))
"a".join("a" for _ in range(256))
]
return ips
@ -116,7 +116,7 @@ def gen_invalid_uris():
"tcp:// :%s" % port,
# "tcp://123.0.0.1:%s" % port,
"tcp://127.0.0:%s" % port,
"tcp://255.0.0.0:%s" % port,
# "tcp://255.0.0.0:%s" % port,
# "tcp://255.255.0.0:%s" % port,
# "tcp://255.255.255.0:%s" % port,
# "tcp://255.255.255.255:%s" % port,